You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/07/02 23:35:31 UTC

svn commit: r1499118 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/ hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/ hbase-client/src/test/java/org/apach...

Author: stack
Date: Tue Jul  2 21:35:30 2013
New Revision: 1499118

URL: http://svn.apache.org/r1499118
Log:
HBASE-8737 [replication] Change replication RPC to use cell blocks

Added:
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/SizedCellScanner.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java
Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
    hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java
    hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
    hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java Tue Jul  2 21:35:30 2013
@@ -106,6 +106,23 @@ public class Delete extends Mutation imp
    * @param rowArray We make a local copy of this passed in row.
    * @param rowOffset
    * @param rowLength
+   */
+  public Delete(final byte [] rowArray, final int rowOffset, final int rowLength) {
+    this(rowArray, rowOffset, rowLength, HConstants.LATEST_TIMESTAMP);
+  }
+
+  /**
+   * Create a Delete operation for the specified row and timestamp.<p>
+   *
+   * If no further operations are done, this will delete all columns in all
+   * families of the specified row with a timestamp less than or equal to the
+   * specified timestamp.<p>
+   *
+   * This timestamp is ONLY used for a delete row operation.  If specifying
+   * families or columns, you must specify each timestamp individually.
+   * @param rowArray We make a local copy of this passed in row.
+   * @param rowOffset
+   * @param rowLength
    * @param ts maximum version timestamp (only for delete row)
    */
   public Delete(final byte [] rowArray, final int rowOffset, final int rowLength, long ts) {

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java Tue Jul  2 21:35:30 2013
@@ -70,6 +70,17 @@ public class Put extends Mutation implem
    * @param rowLength
    * @param ts
    */
+  public Put(byte [] rowArray, int rowOffset, int rowLength) {
+    this(rowArray, rowOffset, rowLength, HConstants.LATEST_TIMESTAMP);
+  }
+
+  /**
+   * We make a copy of the passed in row key to keep local.
+   * @param rowArray
+   * @param rowOffset
+   * @param rowLength
+   * @param ts
+   */
   public Put(byte [] rowArray, int rowOffset, int rowLength, long ts) {
     checkRow(rowArray, rowOffset, rowLength);
     this.row = Bytes.copy(rowArray, rowOffset, rowLength);

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/IPCUtil.java Tue Jul  2 21:35:30 2013
@@ -33,10 +33,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
-import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerReportRequest;
-import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionServerStartupRequest;
+import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.CompressionInputStream;
@@ -47,49 +46,61 @@ import com.google.common.base.Preconditi
 import com.google.protobuf.CodedInputStream;
 import com.google.protobuf.CodedOutputStream;
 import com.google.protobuf.Message;
-import com.google.protobuf.TextFormat;
 
 /**
  * Utility to help ipc'ing.
  */
 class IPCUtil {
   public static final Log LOG = LogFactory.getLog(IPCUtil.class);
-  private final int cellBlockBuildingInitialBufferSize;
   /**
    * How much we think the decompressor will expand the original compressed content.
    */
   private final int cellBlockDecompressionMultiplier;
+  private final int cellBlockBuildingInitialBufferSize;
   private final Configuration conf;
 
   IPCUtil(final Configuration conf) {
     super();
     this.conf = conf;
-    this.cellBlockBuildingInitialBufferSize =
-      conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024);
     this.cellBlockDecompressionMultiplier =
         conf.getInt("hbase.ipc.cellblock.decompression.buffersize.multiplier", 3);
+    // Guess that 16k is a good size for rpc buffer.  Could go bigger.  See the TODO below in
+    // #buildCellBlock.
+    this.cellBlockBuildingInitialBufferSize =
+      ClassSize.align(conf.getInt("hbase.ipc.cellblock.building.initial.buffersize", 16 * 1024));
   }
 
   /**
-   * Build a cell block using passed in <code>codec</code>
+   * Puts CellScanner Cells into a cell block using passed in <code>codec</code> and/or
+   * <code>compressor</code>.
    * @param codec
    * @param compressor
-   * @Param cells
-   * @return Null or byte buffer filled with passed-in Cells encoded using passed in
-   * <code>codec</code>; the returned buffer has been flipped and is ready for
-   * reading.  Use limit to find total size.
+   * @Param cellScanner
+   * @return Null or byte buffer filled with a cellblock filled with passed-in Cells encoded using
+   * passed in <code>codec</code> and/or <code>compressor</code>; the returned buffer has been
+   * flipped and is ready for reading.  Use limit to find total size.
    * @throws IOException
    */
   @SuppressWarnings("resource")
   ByteBuffer buildCellBlock(final Codec codec, final CompressionCodec compressor,
-      final CellScanner cells)
+    final CellScanner cellScanner)
   throws IOException {
-    if (cells == null) return null;
-    // TOOD: Reuse buffers?
-    // Presizing doesn't work because can't tell what size will be when serialized.
-    // BBOS will resize itself.
-    ByteBufferOutputStream baos =
-      new ByteBufferOutputStream(this.cellBlockBuildingInitialBufferSize);
+    if (cellScanner == null) return null;
+    int bufferSize = this.cellBlockBuildingInitialBufferSize;
+    if (cellScanner instanceof HeapSize) {
+      long longSize = ((HeapSize)cellScanner).heapSize();
+      // Just make sure we don't have a size bigger than an int.
+      if (longSize > Integer.MAX_VALUE) {
+        throw new IOException("Size " + longSize + " > " + Integer.MAX_VALUE);
+      }
+      bufferSize = ClassSize.align((int)longSize);
+    } // TODO: Else, get estimate on size of buffer rather than have the buffer resize.
+    // See TestIPCUtil main for experiment where we spin through the Cells getting estimate of
+    // total size before creating the buffer.  It costs somw small percentage.  If we are usually
+    // within the estimated buffer size, then the cost is not worth it.  If we are often well
+    // outside the guesstimated buffer size, the processing can be done in half the time if we
+    // go w/ the estimated size rather than let the buffer resize.
+    ByteBufferOutputStream baos = new ByteBufferOutputStream(bufferSize);
     OutputStream os = baos;
     Compressor poolCompressor = null;
     try {
@@ -99,8 +110,8 @@ class IPCUtil {
         os = compressor.createOutputStream(os, poolCompressor);
       }
       Codec.Encoder encoder = codec.getEncoder(os);
-      while (cells.advance()) {
-        encoder.write(cells.current());
+      while (cellScanner.advance()) {
+        encoder.write(cellScanner.current());
       }
       encoder.flush();
     } finally {
@@ -108,9 +119,9 @@ class IPCUtil {
       if (poolCompressor != null) CodecPool.returnCompressor(poolCompressor);
     }
     if (LOG.isTraceEnabled()) {
-      if (this.cellBlockBuildingInitialBufferSize < baos.size()) {
-        LOG.trace("Buffer grew from " + this.cellBlockBuildingInitialBufferSize +
-        " to " + baos.size());
+      if (bufferSize < baos.size()) {
+        LOG.trace("Buffer grew from initial bufferSize=" + bufferSize + " to " + baos.size() +
+          "; up hbase.ipc.cellblock.building.initial.buffersize?");
       }
     }
     return baos.getByteBuffer();

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/protobuf/RequestConverter.java Tue Jul  2 21:35:30 2013
@@ -1290,4 +1290,4 @@ public final class RequestConverter {
     }
     return builder.build();
   }
-}
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java (original)
+++ hbase/trunk/hbase-client/src/test/java/org/apache/hadoop/hbase/ipc/TestIPCUtil.java Tue Jul  2 21:35:30 2013
@@ -23,21 +23,28 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
+import org.apache.commons.lang.time.StopWatch;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.SmallTests;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.codec.KeyValueCodec;
+import org.apache.hadoop.hbase.io.SizedCellScanner;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.io.compress.CompressionCodec;
 import org.apache.hadoop.io.compress.DefaultCodec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mortbay.log.Log;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
 
 @Category(SmallTests.class) 
 public class TestIPCUtil {
@@ -49,33 +56,137 @@ public class TestIPCUtil {
   
   @Test
   public void testBuildCellBlock() throws IOException {
-    doBuildCellBlockUndoCellBlock(new KeyValueCodec(), null);
-    doBuildCellBlockUndoCellBlock(new KeyValueCodec(), new DefaultCodec());
-    doBuildCellBlockUndoCellBlock(new KeyValueCodec(), new GzipCodec());
+    doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), null);
+    doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new DefaultCodec());
+    doBuildCellBlockUndoCellBlock(this.util, new KeyValueCodec(), new GzipCodec());
   }
 
-  void doBuildCellBlockUndoCellBlock(final Codec codec, final CompressionCodec compressor)
+  static void doBuildCellBlockUndoCellBlock(final IPCUtil util,
+      final Codec codec, final CompressionCodec compressor)
   throws IOException {
-    final int count = 10;
-    Cell [] cells = getCells(count);
-    ByteBuffer bb = this.util.buildCellBlock(codec, compressor,
-      CellUtil.createCellScanner(Arrays.asList(cells).iterator()));
-    CellScanner scanner =
-      this.util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit());
+    doBuildCellBlockUndoCellBlock(util, codec, compressor, 10, 1, false);
+  }
+
+  static void doBuildCellBlockUndoCellBlock(final IPCUtil util, final Codec codec,
+    final CompressionCodec compressor, final int count, final int size, final boolean sized)
+  throws IOException {
+    Cell [] cells = getCells(count, size);
+    CellScanner cellScanner = sized? getSizedCellScanner(cells):
+      CellUtil.createCellScanner(Arrays.asList(cells).iterator());
+    ByteBuffer bb = util.buildCellBlock(codec, compressor, cellScanner);
+    cellScanner = util.createCellScanner(codec, compressor, bb.array(), 0, bb.limit());
     int i = 0;
-    while (scanner.advance()) {
+    while (cellScanner.advance()) {
       i++;
     }
     assertEquals(count, i);
   }
 
+  static CellScanner getSizedCellScanner(final Cell [] cells) {
+    int size = -1;
+    for (Cell cell: cells) {
+      size += CellUtil.estimatedSizeOf(cell);
+    }
+    final int totalSize = ClassSize.align(size);
+    final CellScanner cellScanner = CellUtil.createCellScanner(cells);
+    return new SizedCellScanner() {
+      @Override
+      public long heapSize() {
+        return totalSize;
+      }
+
+      @Override
+      public Cell current() {
+        return cellScanner.current();
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        return cellScanner.advance();
+      }
+    };
+  }
+
   static Cell [] getCells(final int howMany) {
+    return getCells(howMany, 1024);
+  }
+
+  static Cell [] getCells(final int howMany, final int valueSize) {
     Cell [] cells = new Cell[howMany];
+    byte [] value = new byte[valueSize];
     for (int i = 0; i < howMany; i++) {
       byte [] index = Bytes.toBytes(i);
-      KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, index);
+      KeyValue kv = new KeyValue(index, Bytes.toBytes("f"), index, value);
       cells[i] = kv;
     }
     return cells;
   }
+
+  private static final String COUNT = "--count=";
+  private static final String SIZE = "--size=";
+
+  /**
+   * Prints usage and then exits w/ passed <code>errCode</code>
+   * @param errCode
+   */
+  private static void usage(final int errCode) {
+    System.out.println("Usage: IPCUtil [options]");
+    System.out.println("Micro-benchmarking how changed sizes and counts work with buffer resizing");
+    System.out.println(" --count  Count of Cells");
+    System.out.println(" --size   Size of Cell values");
+    System.out.println("Example: IPCUtil --count=1024 --size=1024");
+    System.exit(errCode);
+  }
+
+  private static void timerTests(final IPCUtil util, final int count, final int size,
+      final Codec codec, final CompressionCodec compressor)
+  throws IOException {
+    final int cycles = 1000;
+    StopWatch timer = new StopWatch();
+    timer.start();
+    for (int i = 0; i < cycles; i++) {
+      timerTest(util, timer, count, size, codec, compressor, false);
+    }
+    timer.stop();
+    Log.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + false +
+        ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
+    timer.reset();
+    timer.start();
+    for (int i = 0; i < cycles; i++) {
+      timerTest(util, timer, count, size, codec, compressor, true);
+    }
+    timer.stop();
+    Log.info("Codec=" + codec + ", compression=" + compressor + ", sized=" + true +
+      ", count=" + count + ", size=" + size + ", + took=" + timer.getTime() + "ms");
+  }
+
+  private static void timerTest(final IPCUtil util, final StopWatch timer, final int count,
+      final int size, final Codec codec, final CompressionCodec compressor, final boolean sized)
+  throws IOException {
+    doBuildCellBlockUndoCellBlock(util, codec, compressor, count, size, sized);
+  }
+
+  /**
+   * For running a few tests of methods herein.
+   * @param args
+   * @throws IOException
+   */
+  public static void main(String[] args) throws IOException {
+    int count = 1024;
+    int size = 10240;
+    for (String arg: args) {
+      if (arg.startsWith(COUNT)) {
+        count = Integer.parseInt(arg.replace(COUNT, ""));
+      } else if (arg.startsWith(SIZE)) {
+        size = Integer.parseInt(arg.replace(SIZE, ""));
+      } else {
+        usage(1);
+      }
+    }
+    IPCUtil util = new IPCUtil(HBaseConfiguration.create());
+    ((Log4JLogger)IPCUtil.LOG).getLogger().setLevel(Level.ALL);
+    timerTests(util, count, size,  new KeyValueCodec(), null);
+    timerTests(util, count, size,  new KeyValueCodec(), new DefaultCodec());
+    timerTests(util, count, size,  new KeyValueCodec(), new GzipCodec());
+  }
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java Tue Jul  2 21:35:30 2013
@@ -28,6 +28,7 @@ import java.util.NavigableMap;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.util.ByteRange;
+import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * Utility methods helpful slinging {@link Cell} instances.
@@ -242,4 +243,44 @@ public final class CellUtil {
       }
     };
   }
-}
+
+  /**
+   * @param left
+   * @param right
+   * @return True if the rows in <code>left</code> and <code>right</code> Cells match
+   */
+  public static boolean matchingRow(final Cell left, final Cell right) {
+    return Bytes.equals(left.getRowArray(),  left.getRowOffset(), left.getRowLength(),
+      right.getRowArray(), right.getRowOffset(), right.getRowLength());
+  }
+
+  /**
+   * @return True if a delete type, a {@link KeyValue.Type#Delete} or
+   * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
+   * KeyValue type.
+   */
+  public static boolean isDelete(final Cell cell) {
+    return KeyValue.isDelete(cell.getTypeByte());
+  }
+
+  /**
+   * @param cell
+   * @return Estimate of the <code>cell</code> size in bytes.
+   */
+  public static int estimatedSizeOf(final Cell cell) {
+    // If a KeyValue, we can give a good estimate of size.
+    if (cell instanceof KeyValue) {
+      return ((KeyValue)cell).getLength() + Bytes.SIZEOF_INT;
+    }
+    // TODO: Should we add to Cell a sizeOf?  Would it help? Does it make sense if Cell is
+    // prefix encoded or compressed?
+    return cell.getRowLength() + cell.getFamilyLength() +
+      cell.getQualifierLength() +
+      cell.getValueLength() +
+      // Use the KeyValue's infrastructure size presuming that another implementation would have
+      // same basic cost.
+      KeyValue.KEY_INFRASTRUCTURE_SIZE +
+      // Serialization is probably preceded by a length (it is in the KeyValueCodec at least).
+      Bytes.SIZEOF_INT;
+  }
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/KeyValue.java Tue Jul  2 21:35:30 2013
@@ -20,9 +20,7 @@
 package org.apache.hadoop.hbase;
 
 import java.io.DataInput;
-import java.io.DataInputStream;
 import java.io.DataOutput;
-import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -37,7 +35,6 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;

Modified: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java (original)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/HeapSize.java Tue Jul  2 21:35:30 2013
@@ -46,5 +46,4 @@ public interface HeapSize {
    * count of payload and hosting object sizings.
   */
   public long heapSize();
-
-}
+}
\ No newline at end of file

Added: hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/SizedCellScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/SizedCellScanner.java?rev=1499118&view=auto
==============================================================================
--- hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/SizedCellScanner.java (added)
+++ hbase/trunk/hbase-common/src/main/java/org/apache/hadoop/hbase/io/SizedCellScanner.java Tue Jul  2 21:35:30 2013
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.io;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import org.apache.hadoop.hbase.CellScanner;
+
+/**
+ * A CellScanner that knows its size in memory in bytes.
+ * Used playing the CellScanner into an in-memory buffer; knowing the size ahead of time saves
+ * on background buffer resizings.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public interface SizedCellScanner extends CellScanner, HeapSize {}
\ No newline at end of file

Modified: hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java (original)
+++ hbase/trunk/hbase-protocol/src/main/java/org/apache/hadoop/hbase/protobuf/generated/AdminProtos.java Tue Jul  2 21:35:30 2013
@@ -9894,6 +9894,10 @@ public final class AdminProtos {
     java.util.List<com.google.protobuf.ByteString> getKeyValueBytesList();
     int getKeyValueBytesCount();
     com.google.protobuf.ByteString getKeyValueBytes(int index);
+    
+    // optional int32 associatedCellCount = 3;
+    boolean hasAssociatedCellCount();
+    int getAssociatedCellCount();
   }
   public static final class WALEntry extends
       com.google.protobuf.GeneratedMessage
@@ -9951,9 +9955,20 @@ public final class AdminProtos {
       return keyValueBytes_.get(index);
     }
     
+    // optional int32 associatedCellCount = 3;
+    public static final int ASSOCIATEDCELLCOUNT_FIELD_NUMBER = 3;
+    private int associatedCellCount_;
+    public boolean hasAssociatedCellCount() {
+      return ((bitField0_ & 0x00000002) == 0x00000002);
+    }
+    public int getAssociatedCellCount() {
+      return associatedCellCount_;
+    }
+    
     private void initFields() {
       key_ = org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALKey.getDefaultInstance();
       keyValueBytes_ = java.util.Collections.emptyList();;
+      associatedCellCount_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -9981,6 +9996,9 @@ public final class AdminProtos {
       for (int i = 0; i < keyValueBytes_.size(); i++) {
         output.writeBytes(2, keyValueBytes_.get(i));
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        output.writeInt32(3, associatedCellCount_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -10003,6 +10021,10 @@ public final class AdminProtos {
         size += dataSize;
         size += 1 * getKeyValueBytesList().size();
       }
+      if (((bitField0_ & 0x00000002) == 0x00000002)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeInt32Size(3, associatedCellCount_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -10033,6 +10055,11 @@ public final class AdminProtos {
       }
       result = result && getKeyValueBytesList()
           .equals(other.getKeyValueBytesList());
+      result = result && (hasAssociatedCellCount() == other.hasAssociatedCellCount());
+      if (hasAssociatedCellCount()) {
+        result = result && (getAssociatedCellCount()
+            == other.getAssociatedCellCount());
+      }
       result = result &&
           getUnknownFields().equals(other.getUnknownFields());
       return result;
@@ -10050,6 +10077,10 @@ public final class AdminProtos {
         hash = (37 * hash) + KEYVALUEBYTES_FIELD_NUMBER;
         hash = (53 * hash) + getKeyValueBytesList().hashCode();
       }
+      if (hasAssociatedCellCount()) {
+        hash = (37 * hash) + ASSOCIATEDCELLCOUNT_FIELD_NUMBER;
+        hash = (53 * hash) + getAssociatedCellCount();
+      }
       hash = (29 * hash) + getUnknownFields().hashCode();
       return hash;
     }
@@ -10175,6 +10206,8 @@ public final class AdminProtos {
         bitField0_ = (bitField0_ & ~0x00000001);
         keyValueBytes_ = java.util.Collections.emptyList();;
         bitField0_ = (bitField0_ & ~0x00000002);
+        associatedCellCount_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000004);
         return this;
       }
       
@@ -10226,6 +10259,10 @@ public final class AdminProtos {
           bitField0_ = (bitField0_ & ~0x00000002);
         }
         result.keyValueBytes_ = keyValueBytes_;
+        if (((from_bitField0_ & 0x00000004) == 0x00000004)) {
+          to_bitField0_ |= 0x00000002;
+        }
+        result.associatedCellCount_ = associatedCellCount_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -10255,6 +10292,9 @@ public final class AdminProtos {
           }
           onChanged();
         }
+        if (other.hasAssociatedCellCount()) {
+          setAssociatedCellCount(other.getAssociatedCellCount());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -10308,6 +10348,11 @@ public final class AdminProtos {
               keyValueBytes_.add(input.readBytes());
               break;
             }
+            case 24: {
+              bitField0_ |= 0x00000004;
+              associatedCellCount_ = input.readInt32();
+              break;
+            }
           }
         }
       }
@@ -10455,6 +10500,27 @@ public final class AdminProtos {
         return this;
       }
       
+      // optional int32 associatedCellCount = 3;
+      private int associatedCellCount_ ;
+      public boolean hasAssociatedCellCount() {
+        return ((bitField0_ & 0x00000004) == 0x00000004);
+      }
+      public int getAssociatedCellCount() {
+        return associatedCellCount_;
+      }
+      public Builder setAssociatedCellCount(int value) {
+        bitField0_ |= 0x00000004;
+        associatedCellCount_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearAssociatedCellCount() {
+        bitField0_ = (bitField0_ & ~0x00000004);
+        associatedCellCount_ = 0;
+        onChanged();
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:WALEntry)
     }
     
@@ -15359,41 +15425,41 @@ public final class AdminProtos {
       "gionsRequest\022!\n\007regionA\030\001 \002(\0132\020.RegionSp" +
       "ecifier\022!\n\007regionB\030\002 \002(\0132\020.RegionSpecifi" +
       "er\022\027\n\010forcible\030\003 \001(\010:\005false\"\026\n\024MergeRegi" +
-      "onsResponse\"7\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.W",
-      "ALKey\022\025\n\rkeyValueBytes\030\002 \003(\014\"4\n\030Replicat" +
-      "eWALEntryRequest\022\030\n\005entry\030\001 \003(\0132\t.WALEnt" +
-      "ry\"\033\n\031ReplicateWALEntryResponse\"\026\n\024RollW" +
-      "ALWriterRequest\".\n\025RollWALWriterResponse" +
-      "\022\025\n\rregionToFlush\030\001 \003(\014\"#\n\021StopServerReq" +
-      "uest\022\016\n\006reason\030\001 \002(\t\"\024\n\022StopServerRespon" +
-      "se\"\026\n\024GetServerInfoRequest\"@\n\nServerInfo" +
-      "\022\037\n\nserverName\030\001 \002(\0132\013.ServerName\022\021\n\tweb" +
-      "uiPort\030\002 \001(\r\"8\n\025GetServerInfoResponse\022\037\n" +
-      "\nserverInfo\030\001 \002(\0132\013.ServerInfo2\337\006\n\014Admin",
-      "Service\022>\n\rgetRegionInfo\022\025.GetRegionInfo" +
-      "Request\032\026.GetRegionInfoResponse\022;\n\014getSt" +
-      "oreFile\022\024.GetStoreFileRequest\032\025.GetStore" +
-      "FileResponse\022D\n\017getOnlineRegion\022\027.GetOnl" +
-      "ineRegionRequest\032\030.GetOnlineRegionRespon" +
-      "se\0225\n\nopenRegion\022\022.OpenRegionRequest\032\023.O" +
-      "penRegionResponse\0228\n\013closeRegion\022\023.Close" +
-      "RegionRequest\032\024.CloseRegionResponse\0228\n\013f" +
-      "lushRegion\022\023.FlushRegionRequest\032\024.FlushR" +
-      "egionResponse\0228\n\013splitRegion\022\023.SplitRegi",
-      "onRequest\032\024.SplitRegionResponse\022>\n\rcompa" +
-      "ctRegion\022\025.CompactRegionRequest\032\026.Compac" +
-      "tRegionResponse\022;\n\014mergeRegions\022\024.MergeR" +
-      "egionsRequest\032\025.MergeRegionsResponse\022J\n\021" +
-      "replicateWALEntry\022\031.ReplicateWALEntryReq" +
-      "uest\032\032.ReplicateWALEntryResponse\022\'\n\006repl" +
-      "ay\022\r.MultiRequest\032\016.MultiResponse\022>\n\rrol" +
-      "lWALWriter\022\025.RollWALWriterRequest\032\026.Roll" +
-      "WALWriterResponse\022>\n\rgetServerInfo\022\025.Get" +
-      "ServerInfoRequest\032\026.GetServerInfoRespons",
-      "e\0225\n\nstopServer\022\022.StopServerRequest\032\023.St" +
-      "opServerResponseBA\n*org.apache.hadoop.hb" +
-      "ase.protobuf.generatedB\013AdminProtosH\001\210\001\001" +
-      "\240\001\001"
+      "onsResponse\"T\n\010WALEntry\022\024\n\003key\030\001 \002(\0132\007.W",
+      "ALKey\022\025\n\rkeyValueBytes\030\002 \003(\014\022\033\n\023associat" +
+      "edCellCount\030\003 \001(\005\"4\n\030ReplicateWALEntryRe" +
+      "quest\022\030\n\005entry\030\001 \003(\0132\t.WALEntry\"\033\n\031Repli" +
+      "cateWALEntryResponse\"\026\n\024RollWALWriterReq" +
+      "uest\".\n\025RollWALWriterResponse\022\025\n\rregionT" +
+      "oFlush\030\001 \003(\014\"#\n\021StopServerRequest\022\016\n\006rea" +
+      "son\030\001 \002(\t\"\024\n\022StopServerResponse\"\026\n\024GetSe" +
+      "rverInfoRequest\"@\n\nServerInfo\022\037\n\nserverN" +
+      "ame\030\001 \002(\0132\013.ServerName\022\021\n\twebuiPort\030\002 \001(" +
+      "\r\"8\n\025GetServerInfoResponse\022\037\n\nserverInfo",
+      "\030\001 \002(\0132\013.ServerInfo2\337\006\n\014AdminService\022>\n\r" +
+      "getRegionInfo\022\025.GetRegionInfoRequest\032\026.G" +
+      "etRegionInfoResponse\022;\n\014getStoreFile\022\024.G" +
+      "etStoreFileRequest\032\025.GetStoreFileRespons" +
+      "e\022D\n\017getOnlineRegion\022\027.GetOnlineRegionRe" +
+      "quest\032\030.GetOnlineRegionResponse\0225\n\nopenR" +
+      "egion\022\022.OpenRegionRequest\032\023.OpenRegionRe" +
+      "sponse\0228\n\013closeRegion\022\023.CloseRegionReque" +
+      "st\032\024.CloseRegionResponse\0228\n\013flushRegion\022" +
+      "\023.FlushRegionRequest\032\024.FlushRegionRespon",
+      "se\0228\n\013splitRegion\022\023.SplitRegionRequest\032\024" +
+      ".SplitRegionResponse\022>\n\rcompactRegion\022\025." +
+      "CompactRegionRequest\032\026.CompactRegionResp" +
+      "onse\022;\n\014mergeRegions\022\024.MergeRegionsReque" +
+      "st\032\025.MergeRegionsResponse\022J\n\021replicateWA" +
+      "LEntry\022\031.ReplicateWALEntryRequest\032\032.Repl" +
+      "icateWALEntryResponse\022\'\n\006replay\022\r.MultiR" +
+      "equest\032\016.MultiResponse\022>\n\rrollWALWriter\022" +
+      "\025.RollWALWriterRequest\032\026.RollWALWriterRe" +
+      "sponse\022>\n\rgetServerInfo\022\025.GetServerInfoR",
+      "equest\032\026.GetServerInfoResponse\0225\n\nstopSe" +
+      "rver\022\022.StopServerRequest\032\023.StopServerRes" +
+      "ponseBA\n*org.apache.hadoop.hbase.protobu" +
+      "f.generatedB\013AdminProtosH\001\210\001\001\240\001\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -15557,7 +15623,7 @@ public final class AdminProtos {
           internal_static_WALEntry_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_WALEntry_descriptor,
-              new java.lang.String[] { "Key", "KeyValueBytes", },
+              new java.lang.String[] { "Key", "KeyValueBytes", "AssociatedCellCount", },
               org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.class,
               org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry.Builder.class);
           internal_static_ReplicateWALEntryRequest_descriptor =

Modified: hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto (original)
+++ hbase/trunk/hbase-protocol/src/main/protobuf/Admin.proto Tue Jul  2 21:35:30 2013
@@ -161,14 +161,18 @@ message MergeRegionsResponse {
 // Protocol buffer version of WAL for replication
 message WALEntry {
   required WALKey key = 1;
+  // Following may be null if the KVs/Cells are carried along the side in a cellblock (See
+  // RPC for more on cellblocks). If Cells/KVs are in a cellblock, this next field is null
+  // and associatedCellCount has count of Cells associated w/ this WALEntry
   repeated bytes keyValueBytes = 2;
+  // If Cell data is carried alongside in a cellblock, this is count of Cells in the cellblock.
+  optional int32 associatedCellCount = 3;
 }
 
 /**
  * Replicates the given entries. The guarantee is that the given entries
  * will be durable on the slave cluster if this method returns without
- * any exception.
- * hbase.replication has to be set to true for this to work.
+ * any exception.  hbase.replication has to be set to true for this to work.
  */
 message ReplicateWALEntryRequest {
   repeated WALEntry entry = 1;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java Tue Jul  2 21:35:30 2013
@@ -20,26 +20,32 @@
 package org.apache.hadoop.hbase.protobuf;
 
 
-import com.google.protobuf.ByteString;
-import com.google.protobuf.ServiceException;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+import java.util.UUID;
 
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.SizedCellScanner;
+import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.protobuf.generated.WALProtos;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableMap;
-import java.util.TreeMap;
-import java.util.UUID;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.ServiceException;
 
 public class ReplicationProtbufUtil {
   /**
@@ -81,10 +87,11 @@ public class ReplicationProtbufUtil {
    */
   public static void replicateWALEntry(final AdminService.BlockingInterface admin,
       final HLog.Entry[] entries) throws IOException {
-    AdminProtos.ReplicateWALEntryRequest request =
+    Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
       buildReplicateWALEntryRequest(entries);
     try {
-      admin.replicateWALEntry(null, request);
+      PayloadCarryingRpcController controller = new PayloadCarryingRpcController(p.getSecond());
+      admin.replicateWALEntry(controller, p.getFirst());
     } catch (ServiceException se) {
       throw ProtobufUtil.getRemoteException(se);
     }
@@ -94,10 +101,14 @@ public class ReplicationProtbufUtil {
    * Create a new ReplicateWALEntryRequest from a list of HLog entries
    *
    * @param entries the HLog entries to be replicated
-   * @return a ReplicateWALEntryRequest
+   * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
+   * found.
    */
-  public static AdminProtos.ReplicateWALEntryRequest
+  public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
       buildReplicateWALEntryRequest(final HLog.Entry[] entries) {
+    // Accumulate all the KVs seen in here.
+    List<List<? extends Cell>> allkvs = new ArrayList<List<? extends Cell>>(entries.length);
+    int size = 0;
     WALProtos.FamilyScope.Builder scopeBuilder = WALProtos.FamilyScope.newBuilder();
     AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
     AdminProtos.ReplicateWALEntryRequest.Builder builder =
@@ -128,13 +139,55 @@ public class ReplicationProtbufUtil {
           keyBuilder.addScopes(scopeBuilder.build());
         }
       }
-      List<KeyValue> keyValues = edit.getKeyValues();
-      for (KeyValue value: keyValues) {
-        entryBuilder.addKeyValueBytes(ByteString.copyFrom(
-          value.getBuffer(), value.getOffset(), value.getLength()));
+      List<KeyValue> kvs = edit.getKeyValues();
+      // Add up the size.  It is used later serializing out the kvs.
+      for (KeyValue kv: kvs) {
+        size += kv.getLength();
       }
+      // Collect up the kvs
+      allkvs.add(kvs);
+      // Write out how many kvs associated with this entry.
+      entryBuilder.setAssociatedCellCount(kvs.size());
       builder.addEntry(entryBuilder.build());
     }
-    return builder.build();
+    return new Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>(builder.build(),
+      getCellScanner(allkvs, size));
+  }
+
+  /**
+   * @param cells
+   * @return <code>cells</code> packaged as a CellScanner
+   */
+  static CellScanner getCellScanner(final List<List<? extends Cell>> cells, final int size) {
+    return new SizedCellScanner() {
+      private final Iterator<List<? extends Cell>> entries = cells.iterator();
+      private Iterator<? extends Cell> currentIterator = null;
+      private Cell currentCell;
+
+      @Override
+      public Cell current() {
+        return this.currentCell;
+      }
+
+      @Override
+      public boolean advance() {
+        if (this.currentIterator == null) {
+          if (!this.entries.hasNext()) return false;
+          this.currentIterator = this.entries.next().iterator();
+        }
+        if (this.currentIterator.hasNext()) {
+          this.currentCell = this.currentIterator.next();
+          return true;
+        }
+        this.currentCell = null;
+        this.currentIterator = null;
+        return advance();
+      }
+
+      @Override
+      public long heapSize() {
+        return size;
+      }
+    };
   }
-}
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Jul  2 21:35:30 2013
@@ -2219,8 +2219,7 @@ public class HRegionServer implements Cl
                                          conf, server, fs, logDir, oldLogDir);
       server.replicationSinkHandler = (ReplicationSinkService)
                                          server.replicationSourceHandler;
-    }
-    else {
+    } else {
       server.replicationSourceHandler = (ReplicationSourceService)
                                          newReplicationInstance(sourceClassname,
                                          conf, server, fs, logDir, oldLogDir);
@@ -3715,15 +3714,14 @@ public class HRegionServer implements Cl
   @Override
   @QosPriority(priority=HConstants.REPLICATION_QOS)
   public ReplicateWALEntryResponse replicateWALEntry(final RpcController controller,
-      final ReplicateWALEntryRequest request) throws ServiceException {
+      final ReplicateWALEntryRequest request)
+  throws ServiceException {
     try {
       if (replicationSinkHandler != null) {
         checkOpen();
         requestCount.increment();
-        HLog.Entry[] entries = ReplicationProtbufUtil.toHLogEntries(request.getEntryList());
-        if (entries != null && entries.length > 0) {
-          replicationSinkHandler.replicateLogEntries(entries);
-        }
+        this.replicationSinkHandler.replicateLogEntries(request.getEntryList(),
+          ((PayloadCarryingRpcController)controller).cellScanner());
       }
       return ReplicateWALEntryResponse.newBuilder().build();
     } catch (IOException ie) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSinkService.java Tue Jul  2 21:35:30 2013
@@ -19,9 +19,11 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 
 /**
  * A sink for a replication stream has to expose this service.
@@ -30,11 +32,11 @@ import org.apache.hadoop.hbase.regionser
  */
 @InterfaceAudience.Private
 public interface ReplicationSinkService extends ReplicationService {
-
- /**
+  /**
    * Carry on the list of log entries down to the sink
-   * @param entries list of entries to replicate
+   * @param entries list of WALEntries to replicate
+   * @param cells Cells that the WALEntries refer to (if cells is non-null)
    * @throws IOException
    */
-  public void replicateLogEntries(HLog.Entry[] entries) throws IOException;
-}
+  public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException;
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ReplicationSourceService.java Tue Jul  2 21:35:30 2013
@@ -18,8 +18,6 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.io.IOException;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 
@@ -30,10 +28,9 @@ import org.apache.hadoop.hbase.regionser
  */
 @InterfaceAudience.Private
 public interface ReplicationSourceService extends ReplicationService {
-
   /**
    * Returns a WALObserver for the service. This is needed to 
    * observe log rolls and log archival events.
    */
   public WALActionsListener getWALActionsListener();
-}
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Tue Jul  2 21:35:30 2013
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
+import java.util.List;
 import java.util.NavigableMap;
 import java.util.TreeMap;
 import java.util.concurrent.Executors;
@@ -33,13 +34,14 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
 import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
@@ -163,11 +165,14 @@ public class Replication implements WALA
   /**
    * Carry on the list of log entries down to the sink
    * @param entries list of entries to replicate
+   * @param cells The data -- the cells -- that <code>entries</code> describes (the entries
+   * do not contain the Cells we are replicating; they are passed here on the side in this
+   * CellScanner).
    * @throws IOException
    */
-  public void replicateLogEntries(HLog.Entry[] entries) throws IOException {
+  public void replicateLogEntries(List<WALEntry> entries, CellScanner cells) throws IOException {
     if (this.replication) {
-      this.replicationSink.replicateEntries(entries);
+      this.replicationSink.replicateEntries(entries, cells);
     }
   }
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Tue Jul  2 21:35:30 2013
@@ -34,19 +34,23 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.HTableInterface;
+import org.apache.hadoop.hbase.client.Mutation;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Row;
-import org.apache.hadoop.hbase.regionserver.wal.HLog;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.WALEntry;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 
@@ -108,17 +112,17 @@ public class ReplicationSink {
    }
 
   /**
-   * Replicate this array of entries directly into the local cluster
-   * using the native client.
+   * Replicate this array of entries directly into the local cluster using the native client.
+   * Like {@link #replicateEntries(org.apache.hadoop.hbase.regionserver.wal.HLog.Entry[])} only
+   * operates against raw protobuf type saving on a convertion from pb to pojo.
    *
    * @param entries
+   * @param cells
    * @throws IOException
    */
-  public void replicateEntries(HLog.Entry[] entries)
-      throws IOException {
-    if (entries.length == 0) {
-      return;
-    }
+  public void replicateEntries(List<WALEntry> entries, final CellScanner cells) throws IOException {
+    if (entries.isEmpty()) return;
+    if (cells == null) throw new NullPointerException("TODO: Add handling of null CellScanner");
     // Very simple optimization where we batch sequences of rows going
     // to the same table.
     try {
@@ -126,40 +130,41 @@ public class ReplicationSink {
       // Map of table => list of Rows, we only want to flushCommits once per
       // invocation of this method per table.
       Map<byte[], List<Row>> rows = new TreeMap<byte[], List<Row>>(Bytes.BYTES_COMPARATOR);
-      for (HLog.Entry entry : entries) {
-        WALEdit edit = entry.getEdit();
-        byte[] table = entry.getKey().getTablename();
-        Put put = null;
-        Delete del = null;
-        KeyValue lastKV = null;
-        List<KeyValue> kvs = edit.getKeyValues();
-        for (KeyValue kv : kvs) {
-          if (lastKV == null || lastKV.getType() != kv.getType() || !lastKV.matchingRow(kv)) {
-            if (kv.isDelete()) {
-              del = new Delete(kv.getRow());
-              del.setClusterId(entry.getKey().getClusterId());
-              addToMultiMap(rows, table, del);
-            } else {
-              put = new Put(kv.getRow());
-              put.setClusterId(entry.getKey().getClusterId());
-              addToMultiMap(rows, table, put);
-            }
+      for (WALEntry entry : entries) {
+        byte[] table = entry.getKey().getTableName().toByteArray();
+        Cell previousCell = null;
+        Mutation m = null;
+        java.util.UUID uuid = toUUID(entry.getKey().getClusterId());
+        int count = entry.getAssociatedCellCount();
+        for (int i = 0; i < count; i++) {
+          // Throw index out of bounds if our cell count is off
+          if (!cells.advance()) {
+            throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i);
           }
-          if (kv.isDelete()) {
-            del.addDeleteMarker(kv);
+          Cell cell = cells.current();
+          if (isNewRowOrType(previousCell, cell)) {
+            // Create new mutation
+            m = CellUtil.isDelete(cell)?
+              new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()):
+              new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength());
+            m.setClusterId(uuid);
+            addToMultiMap(rows, table, m);
+          }
+          if (CellUtil.isDelete(cell)) {
+            ((Delete)m).addDeleteMarker(KeyValueUtil.ensureKeyValue(cell));
           } else {
-            put.add(kv);
+            ((Put)m).add(KeyValueUtil.ensureKeyValue(cell));
           }
-          lastKV = kv;
+          previousCell = cell;
         }
         totalReplicated++;
       }
       for (Entry<byte[], List<Row>> entry : rows.entrySet()) {
         batch(entry.getKey(), entry.getValue());
       }
-      this.metrics.setAgeOfLastAppliedOp(
-          entries[entries.length-1].getKey().getWriteTime());
-      this.metrics.applyBatch(entries.length);
+      int size = entries.size();
+      this.metrics.setAgeOfLastAppliedOp(entries.get(size - 1).getKey().getWriteTime());
+      this.metrics.applyBatch(size);
       this.totalReplicatedEdits.addAndGet(totalReplicated);
     } catch (IOException ex) {
       LOG.error("Unable to accept edit because:", ex);
@@ -168,6 +173,20 @@ public class ReplicationSink {
   }
 
   /**
+   * @param previousCell
+   * @param cell
+   * @return True if we have crossed over onto a new row or type
+   */
+  private boolean isNewRowOrType(final Cell previousCell, final Cell cell) {
+    return previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() ||
+        !CellUtil.matchingRow(previousCell, cell);
+  }
+
+  private java.util.UUID toUUID(final HBaseProtos.UUID uuid) {
+    return new java.util.UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
+  }
+
+  /**
    * Simple helper to a map from key to (a list of) values
    * TODO: Make a general utility method
    * @param map

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue Jul  2 21:35:30 2013
@@ -177,6 +177,9 @@ public class ReplicationSource extends T
         new PriorityBlockingQueue<Path>(
             conf.getInt("hbase.regionserver.maxlogs", 32),
             new LogsComparator());
+    // TODO: This connection is replication specific or we should make it particular to
+    // replication and make replication specific settings such as compression or codec to use
+    // passing Cells.
     this.conn = HConnectionManager.getConnection(conf);
     this.zkHelper = manager.getRepZkWrapper();
     this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
@@ -456,7 +459,6 @@ public class ReplicationSource extends T
 
     // Connect to peer cluster first, unless we have to stop
     while (this.isActive() && this.currentPeers.size() == 0) {
-
       chooseSinks();
       if (this.isActive() && this.currentPeers.size() == 0) {
         if (sleepForRetries("Waiting for peers", sleepMultiplier)) {

Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java?rev=1499118&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/protobuf/TestReplicationProtobuf.java Tue Jul  2 21:35:30 2013
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.protobuf;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+@Category(SmallTests.class)
+public class TestReplicationProtobuf {
+  /**
+   * Little test to check we can basically convert list of a list of KVs into a CellScanner
+   * @throws IOException
+   */
+  @Test
+  public void testGetCellScanner() throws IOException {
+    List<KeyValue> a = new ArrayList<KeyValue>();
+    KeyValue akv = new KeyValue(Bytes.toBytes("a"), -1L);
+    a.add(akv);
+    // Add a few just to make it less regular.
+    a.add(new KeyValue(Bytes.toBytes("aa"), -1L));
+    a.add(new KeyValue(Bytes.toBytes("aaa"), -1L));
+    List<KeyValue> b = new ArrayList<KeyValue>();
+    KeyValue bkv = new KeyValue(Bytes.toBytes("b"), -1L);
+    a.add(bkv);
+    List<KeyValue> c = new ArrayList<KeyValue>();
+    KeyValue ckv = new KeyValue(Bytes.toBytes("c"), -1L);
+    c.add(ckv);
+    List<List<? extends Cell>> all = new ArrayList<List<? extends Cell>>();
+    all.add(a);
+    all.add(b);
+    all.add(c);
+    CellScanner scanner = ReplicationProtbufUtil.getCellScanner(all, 0);
+    testAdvancetHasSameRow(scanner, akv);
+    // Skip over aa
+    scanner.advance();
+    // Skip over aaa
+    scanner.advance();
+    testAdvancetHasSameRow(scanner, bkv);
+    testAdvancetHasSameRow(scanner, ckv);
+    assertFalse(scanner.advance());
+  }
+
+  private void testAdvancetHasSameRow(CellScanner scanner, final KeyValue kv) throws IOException {
+    scanner.advance();
+    assertTrue(Bytes.equals(scanner.current().getRowArray(), scanner.current().getRowOffset(),
+        scanner.current().getRowLength(),
+      kv.getRowArray(), kv.getRowOffset(), kv.getRowLength()));
+  }
+}

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java?rev=1499118&r1=1499117&r2=1499118&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java Tue Jul  2 21:35:30 2013
@@ -449,13 +449,14 @@ public class TestReplicationSmallTests e
 
     scan = new Scan();
 
+    long start = System.currentTimeMillis();
     for (int i = 0; i < NB_RETRIES; i++) {
 
       scanner = htable2.getScanner(scan);
       res = scanner.next(NB_ROWS_IN_BIG_BATCH);
       scanner.close();
       if (res.length != NB_ROWS_IN_BIG_BATCH) {
-        if (i == NB_RETRIES-1) {
+        if (i == NB_RETRIES - 1) {
           int lastRow = -1;
           for (Result result : res) {
             int currentRow = Bytes.toInt(result.getRow());
@@ -465,8 +466,9 @@ public class TestReplicationSmallTests e
             lastRow = currentRow;
           }
           LOG.error("Last row: " + lastRow);
-          fail("Waited too much time for normal batch replication, "
-              + res.length + " instead of " + NB_ROWS_IN_BIG_BATCH);
+          fail("Waited too much time for normal batch replication, " +
+            res.length + " instead of " + NB_ROWS_IN_BIG_BATCH + "; waited=" +
+            (System.currentTimeMillis() - start) + "ms");
         } else {
           LOG.info("Only got " + res.length + " rows");
           Thread.sleep(SLEEP_TIME);