You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/05/30 22:01:03 UTC

[1/2] hbase git commit: HBASE-18027 HBaseInterClusterReplicationEndpoint should respect RPC limits when batching edits

Repository: hbase
Updated Branches:
  refs/heads/branch-1 1a37f3be8 -> 140c559a3
  refs/heads/master 6846b0394 -> d547feac6


HBASE-18027 HBaseInterClusterReplicationEndpoint should respect RPC limits when batching edits


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d547feac
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d547feac
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d547feac

Branch: refs/heads/master
Commit: d547feac6b673d59703e1a0ef46db38b26046e4c
Parents: 6846b03
Author: Andrew Purtell <ap...@apache.org>
Authored: Tue May 30 14:24:51 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue May 30 14:24:51 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |   4 +-
 .../hadoop/hbase/regionserver/wal/WALEdit.java  |   8 +
 .../HBaseInterClusterReplicationEndpoint.java   | 130 ++++--
 .../org/apache/hadoop/hbase/wal/WALKey.java     |  28 +-
 .../regionserver/TestReplicator.java            | 427 +++++++++++++++++++
 5 files changed, 551 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d547feac/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index d553647..0a3db40 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -171,7 +171,7 @@ public abstract class RpcServer implements RpcServerInterface,
 
   protected HBaseRPCErrorHandler errorHandler = null;
 
-  protected static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
+  public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
   protected static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION =
       new RequestTooBigException();
 
@@ -186,7 +186,7 @@ public abstract class RpcServer implements RpcServerInterface,
   protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
 
   /** Default value for above params */
-  protected static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
+  public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
   protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
   protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d547feac/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index d5b95ee..0f32a1d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -171,6 +171,14 @@ public class WALEdit implements HeapSize {
     return ret;
   }
 
+  public long estimatedSerializedSizeOf() {
+    long ret = 0;
+    for (Cell cell: cells) {
+      ret += CellUtil.estimatedSerializedSizeOf(cell);
+    }
+    return ret;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/hbase/blob/d547feac/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 97f28b4..30cec5b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -25,7 +25,9 @@ import java.net.ConnectException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
@@ -47,6 +49,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.ClusterConnection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@@ -56,7 +59,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.ipc.RemoteException;
-import javax.security.sasl.SaslException;
 
 /**
  * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
@@ -86,6 +88,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   private int socketTimeoutMultiplier;
   // Amount of time for shutdown to wait for all tasks to complete
   private long maxTerminationWait;
+  // Size limit for replication RPCs, in bytes
+  private int replicationRpcLimit;
   //Metrics for this source
   private MetricsSource metrics;
   // Handles connecting to peer region servers
@@ -130,6 +134,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         new LinkedBlockingQueue<>());
     this.exec.allowCoreThreadTimeOut(true);
     this.abortable = ctx.getAbortable();
+    // Set the size limit for replication RPCs to 95% of the max request size.
+    // We could do with less slop if we have an accurate estimate of encoded size. Being
+    // conservative for now.
+    this.replicationRpcLimit = (int)(0.95 * (double)conf.getLong(RpcServer.MAX_REQUEST_SIZE,
+      RpcServer.DEFAULT_MAX_REQUEST_SIZE));
 
     this.replicationBulkLoadDataEnabled =
         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
@@ -185,16 +194,46 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     return sleepMultiplier < maxRetriesMultiplier;
   }
 
+  private List<List<Entry>> createBatches(final List<Entry> entries) {
+    int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
+    int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
+    // Maintains the current batch for a given partition index
+    Map<Integer, List<Entry>> entryMap = new HashMap<>(n);
+    List<List<Entry>> entryLists = new ArrayList<>();
+    int[] sizes = new int[n];
+
+    for (int i = 0; i < n; i++) {
+      entryMap.put(i, new ArrayList<Entry>(entries.size()/n+1));
+    }
+
+    for (Entry e: entries) {
+      int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n);
+      int entrySize = (int)e.getKey().estimatedSerializedSizeOf() +
+          (int)e.getEdit().estimatedSerializedSizeOf();
+      // If this batch is oversized, add it to final list and initialize a new empty batch
+      if (sizes[index] > 0 /* must include at least one entry */ &&
+          sizes[index] + entrySize > replicationRpcLimit) {
+        entryLists.add(entryMap.get(index));
+        entryMap.put(index, new ArrayList<Entry>());
+        sizes[index] = 0;
+      }
+      entryMap.get(index).add(e);
+      sizes[index] += entrySize;
+    }
+
+    entryLists.addAll(entryMap.values());
+    return entryLists;
+  }
+
   /**
    * Do the shipping logic
    */
   @Override
   public boolean replicate(ReplicateContext replicateContext) {
     CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
-    List<Entry> entries = replicateContext.getEntries();
+    List<List<Entry>> batches;
     String walGroupId = replicateContext.getWalGroupId();
     int sleepMultiplier = 1;
-    int numReplicated = 0;
 
     if (!peersSelected && this.isRunning()) {
       connectToPeers();
@@ -208,22 +247,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
       return false;
     }
 
-    // minimum of: configured threads, number of 100-waledit batches,
-    //  and number of current sinks
-    int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
+    batches = createBatches(replicateContext.getEntries());
 
-    List<List<Entry>> entryLists = new ArrayList<>(n);
-    if (n == 1) {
-      entryLists.add(entries);
-    } else {
-      for (int i=0; i<n; i++) {
-        entryLists.add(new ArrayList<>(entries.size()/n+1));
-      }
-      // now group by region
-      for (Entry e : entries) {
-        entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
-      }
-    }
     while (this.isRunning() && !exec.isShutdown()) {
       if (!isPeerEnabled()) {
         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
@@ -232,35 +257,35 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         continue;
       }
       try {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Replicating " + entries.size() +
-              " entries of total size " + replicateContext.getSize());
-        }
-
         int futures = 0;
-        for (int i=0; i<entryLists.size(); i++) {
-          if (!entryLists.get(i).isEmpty()) {
+        for (int i=0; i<batches.size(); i++) {
+          List<Entry> entries = batches.get(i);
+          if (!entries.isEmpty()) {
             if (LOG.isTraceEnabled()) {
-              LOG.trace("Submitting " + entryLists.get(i).size() +
+              LOG.trace("Submitting " + entries.size() +
                   " entries of total size " + replicateContext.getSize());
             }
             // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
-            pool.submit(createReplicator(entryLists.get(i), i));
+            pool.submit(createReplicator(entries, i));
             futures++;
           }
         }
         IOException iox = null;
 
+        long lastWriteTime = 0;
         for (int i=0; i<futures; i++) {
           try {
             // wait for all futures, remove successful parts
             // (only the remaining parts will be retried)
             Future<Integer> f = pool.take();
             int index = f.get().intValue();
-            int batchSize =  entryLists.get(index).size();
-            entryLists.set(index, Collections.<Entry>emptyList());
-            // Now, we have marked the batch as done replicating, record its size
-            numReplicated += batchSize;
+            List<Entry> batch = batches.get(index);
+            batches.set(index, Collections.<Entry>emptyList()); // remove successful batch
+            // Find the most recent write time in the batch
+            long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
+            if (writeTime > lastWriteTime) {
+              lastWriteTime = writeTime;
+            }
           } catch (InterruptedException ie) {
             iox =  new IOException(ie);
           } catch (ExecutionException ee) {
@@ -272,15 +297,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
           // if we had any exceptions, try again
           throw iox;
         }
-        if (numReplicated != entries.size()) {
-          // Something went wrong here and we don't know what, let's just fail and retry.
-          LOG.warn("The number of edits replicated is different from the number received,"
-              + " failing for now.");
-          return false;
-        }
         // update metrics
-        this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
-          walGroupId);
+        if (lastWriteTime > 0) {
+          this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
+        }
         return true;
 
       } catch (IOException ioe) {
@@ -374,17 +394,42 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
       this.ordinal = ordinal;
     }
 
+    protected void replicateEntries(BlockingInterface rrs, final List<Entry> batch,
+        String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
+        throws IOException {
+      if (LOG.isTraceEnabled()) {
+        long size = 0;
+        for (Entry e: entries) {
+          size += e.getKey().estimatedSerializedSizeOf();
+          size += e.getEdit().estimatedSerializedSizeOf();
+        }
+        LOG.trace("Replicating batch " + System.identityHashCode(entries) + " of " +
+            entries.size() + " entries with total size " + size + " bytes to " +
+            replicationClusterId);
+      }
+      try {
+        ReplicationProtbufUtil.replicateWALEntry(rrs, batch.toArray(new Entry[batch.size()]),
+          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Completed replicating batch " + System.identityHashCode(entries));
+        }
+      } catch (IOException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Failed replicating batch " + System.identityHashCode(entries), e);
+        }
+        throw e;
+      }
+    }
+
     @Override
     public Integer call() throws IOException {
       SinkPeer sinkPeer = null;
       try {
         sinkPeer = replicationSinkMgr.getReplicationSink();
         BlockingInterface rrs = sinkPeer.getRegionServer();
-        ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
-          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+        replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir);
         replicationSinkMgr.reportSinkSuccess(sinkPeer);
         return ordinal;
-
       } catch (IOException ioe) {
         if (sinkPeer != null) {
           replicationSinkMgr.reportBadSink(sinkPeer);
@@ -392,6 +437,5 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         throw ioe;
       }
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/d547feac/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index bd03e4d..1d84c4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -618,4 +618,30 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
       this.origLogSeqNum = walKey.getOrigSequenceNumber();
     }
   }
-}
\ No newline at end of file
+
+  public long estimatedSerializedSizeOf() {
+    long size = encodedRegionName != null ? encodedRegionName.length : 0;
+    size += tablename != null ? tablename.toBytes().length : 0;
+    if (clusterIds != null) {
+      size += 16 * clusterIds.size();
+    }
+    if (nonceGroup != HConstants.NO_NONCE) {
+      size += Bytes.SIZEOF_LONG; // nonce group
+    }
+    if (nonce != HConstants.NO_NONCE) {
+      size += Bytes.SIZEOF_LONG; // nonce
+    }
+    if (replicationScope != null) {
+      for (Map.Entry<byte[], Integer> scope: replicationScope.entrySet()) {
+        size += scope.getKey().length;
+        size += Bytes.SIZEOF_INT;
+      }
+    }
+    size += Bytes.SIZEOF_LONG; // sequence number
+    size += Bytes.SIZEOF_LONG; // write time
+    if (origLogSeqNum > 0) {
+      size += Bytes.SIZEOF_LONG; // original sequence number
+    }
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/d547feac/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
new file mode 100644
index 0000000..7da56a3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.*;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+@Category(MediumTests.class)
+public class TestReplicator extends TestReplicationBase {
+
+  static final Log LOG = LogFactory.getLog(TestReplicator.class);
+  static final int NUM_ROWS = 10;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Set RPC size limit to 10kb (will be applied to both source and sink clusters)
+    conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
+    TestReplicationBase.setUpBeforeClass();
+    admin.removePeer("2"); // Remove the peer set up for us by base class
+  }
+
+  @Test
+  public void testReplicatorBatching() throws Exception {
+    // Clear the tables
+    truncateTable(utility1, tableName);
+    truncateTable(utility2, tableName);
+
+    // Replace the peer set up for us by the base class with a wrapper for this test
+    admin.addPeer("testReplicatorBatching",
+      new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
+        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+
+    ReplicationEndpointForTest.setBatchCount(0);
+    ReplicationEndpointForTest.setEntriesCount(0);
+    try {
+      ReplicationEndpointForTest.pause();
+      try {
+        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
+        // have to be replicated separately.
+        final byte[] valueBytes = new byte[8 *1024];
+        for (int i = 0; i < NUM_ROWS; i++) {
+          htable1.put(new Put(("row"+Integer.toString(i)).getBytes())
+            .addColumn(famName, null, valueBytes)
+          );
+        }
+      } finally {
+        ReplicationEndpointForTest.resume();
+      }
+
+      // Wait for replication to complete.
+      Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS;
+        }
+
+        @Override
+        public String explainFailure() throws Exception {
+          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
+        }
+      });
+
+      assertEquals("We sent an incorrect number of batches", NUM_ROWS,
+        ReplicationEndpointForTest.getBatchCount());
+      assertEquals("We did not replicate enough rows", NUM_ROWS,
+        utility2.countRows(htable2));
+    } finally {
+      admin.removePeer("testReplicatorBatching");
+    }
+  }
+
+  @Test
+  public void testReplicatorWithErrors() throws Exception {
+    // Clear the tables
+    truncateTable(utility1, tableName);
+    truncateTable(utility2, tableName);
+
+    // Replace the peer set up for us by the base class with a wrapper for this test
+    admin.addPeer("testReplicatorWithErrors",
+      new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
+          .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()),
+        null);
+
+    FailureInjectingReplicationEndpointForTest.setBatchCount(0);
+    FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
+    try {
+      FailureInjectingReplicationEndpointForTest.pause();
+      try {
+        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
+        // have to be replicated separately.
+        final byte[] valueBytes = new byte[8 *1024];
+        for (int i = 0; i < NUM_ROWS; i++) {
+          htable1.put(new Put(("row"+Integer.toString(i)).getBytes())
+            .addColumn(famName, null, valueBytes)
+          );
+        }
+      } finally {
+        FailureInjectingReplicationEndpointForTest.resume();
+      }
+
+      // Wait for replication to complete.
+      // We can expect 10 batches
+      Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS;
+        }
+
+        @Override
+        public String explainFailure() throws Exception {
+          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
+        }
+      });
+
+      assertEquals("We did not replicate enough rows", NUM_ROWS,
+        utility2.countRows(htable2));
+    } finally {
+      admin.removePeer("testReplicatorWithErrors");
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TestReplicationBase.tearDownAfterClass();
+  }
+
+  private void truncateTable(HBaseTestingUtility util, TableName tablename) throws IOException {
+    HBaseAdmin admin = util.getHBaseAdmin();
+    admin.disableTable(tableName);
+    admin.truncateTable(tablename, false);
+  }
+
+  public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint {
+
+    private static int batchCount;
+    private static int entriesCount;
+    private static final Object latch = new Object();
+    private static AtomicBoolean useLatch = new AtomicBoolean(false);
+
+    public static void resume() {
+      useLatch.set(false);
+      synchronized (latch) {
+        latch.notifyAll();
+      }
+    }
+
+    public static void pause() {
+      useLatch.set(true);
+    }
+
+    public static void await() throws InterruptedException {
+      if (useLatch.get()) {
+        LOG.info("Waiting on latch");
+        latch.wait();
+        LOG.info("Waited on latch, now proceeding");
+      }
+    }
+
+    public static int getBatchCount() {
+      return batchCount;
+    }
+
+    public static void setBatchCount(int i) {
+      batchCount = i;
+    }
+
+    public static int getEntriesCount() {
+      return entriesCount;
+    }
+
+    public static void setEntriesCount(int i) {
+      entriesCount = i;
+    }
+
+    public class ReplicatorForTest extends Replicator {
+
+      public ReplicatorForTest(List<Entry> entries, int ordinal) {
+        super(entries, ordinal);
+      }
+
+      @Override
+      protected void replicateEntries(BlockingInterface rrs, final List<Entry> entries,
+          String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
+          throws IOException {
+        try {
+          long size = 0;
+          for (Entry e: entries) {
+            size += e.getKey().estimatedSerializedSizeOf();
+            size += e.getEdit().estimatedSerializedSizeOf();
+          }
+          LOG.info("Replicating batch " + System.identityHashCode(entries) + " of " +
+              entries.size() + " entries with total size " + size + " bytes to " +
+              replicationClusterId);
+          super.replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir,
+            hfileArchiveDir);
+          entriesCount += entries.size();
+          batchCount++;
+          LOG.info("Completed replicating batch " + System.identityHashCode(entries));
+        } catch (IOException e) {
+          LOG.info("Failed to replicate batch " + System.identityHashCode(entries), e);
+          throw e;
+        }
+      }
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      try {
+        await();
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted waiting for latch", e);
+      }
+      return super.replicate(replicateContext);
+    }
+
+    @Override
+    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
+      return new ReplicatorForTest(entries, ordinal);
+    }
+  }
+
+  public static class FailureInjectingReplicationEndpointForTest
+      extends ReplicationEndpointForTest {
+
+    static class FailureInjectingBlockingInterface implements BlockingInterface {
+
+      private final BlockingInterface delegate;
+      private volatile boolean failNext;
+
+      public FailureInjectingBlockingInterface(BlockingInterface delegate) {
+        this.delegate = delegate;
+      }
+
+      @Override
+      public GetRegionInfoResponse getRegionInfo(RpcController controller,
+          GetRegionInfoRequest request) throws ServiceException {
+        return delegate.getRegionInfo(controller, request);
+      }
+
+      @Override
+      public GetStoreFileResponse getStoreFile(RpcController controller,
+          GetStoreFileRequest request) throws ServiceException {
+        return delegate.getStoreFile(controller, request);
+      }
+
+      @Override
+      public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
+          GetOnlineRegionRequest request) throws ServiceException {
+        return delegate.getOnlineRegion(controller, request);
+      }
+
+      @Override
+      public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
+          throws ServiceException {
+        return delegate.openRegion(controller, request);
+      }
+
+      @Override
+      public WarmupRegionResponse warmupRegion(RpcController controller,
+          WarmupRegionRequest request) throws ServiceException {
+        return delegate.warmupRegion(controller, request);
+      }
+
+      @Override
+      public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
+          throws ServiceException {
+        return delegate.closeRegion(controller, request);
+      }
+
+      @Override
+      public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
+          throws ServiceException {
+        return delegate.flushRegion(controller, request);
+      }
+
+      @Override
+      public SplitRegionResponse splitRegion(RpcController controller, SplitRegionRequest request)
+          throws ServiceException {
+        return delegate.splitRegion(controller, request);
+      }
+
+      @Override
+      public CompactRegionResponse compactRegion(RpcController controller,
+          CompactRegionRequest request) throws ServiceException {
+        return delegate.compactRegion(controller, request);
+      }
+
+      @Override
+      public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
+          ReplicateWALEntryRequest request) throws ServiceException {
+        if (!failNext) {
+          failNext = true;
+          return delegate.replicateWALEntry(controller, request);
+        } else {
+          failNext = false;
+          throw new ServiceException("Injected failure");
+        }
+      }
+
+      @Override
+      public ReplicateWALEntryResponse replay(RpcController controller,
+          ReplicateWALEntryRequest request) throws ServiceException {
+        return delegate.replay(controller, request);
+      }
+
+      @Override
+      public RollWALWriterResponse rollWALWriter(RpcController controller,
+          RollWALWriterRequest request) throws ServiceException {
+        return delegate.rollWALWriter(controller, request);
+      }
+
+      @Override
+      public GetServerInfoResponse getServerInfo(RpcController controller,
+          GetServerInfoRequest request) throws ServiceException {
+        return delegate.getServerInfo(controller, request);
+      }
+
+      @Override
+      public StopServerResponse stopServer(RpcController controller, StopServerRequest request)
+          throws ServiceException {
+        return delegate.stopServer(controller, request);
+      }
+
+      @Override
+      public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
+          UpdateFavoredNodesRequest request) throws ServiceException {
+        return delegate.updateFavoredNodes(controller, request);
+      }
+
+      @Override
+      public UpdateConfigurationResponse updateConfiguration(RpcController controller,
+          UpdateConfigurationRequest request) throws ServiceException {
+        return delegate.updateConfiguration(controller, request);
+      }
+
+      @Override
+      public CloseRegionForSplitOrMergeResponse closeRegionForSplitOrMerge(RpcController controller,
+          CloseRegionForSplitOrMergeRequest request) throws ServiceException {
+        return delegate.closeRegionForSplitOrMerge(controller, request);
+      }
+
+      @Override
+      public GetRegionLoadResponse getRegionLoad(RpcController controller,
+          GetRegionLoadRequest request) throws ServiceException {
+        return delegate.getRegionLoad(controller, request);
+      }
+
+      @Override
+      public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
+          ClearCompactionQueuesRequest request) throws ServiceException {
+        return delegate.clearCompactionQueues(controller, request);
+      }
+
+      @Override
+      public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
+          GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
+        return delegate.getSpaceQuotaSnapshots(controller, request);
+      }
+    }
+
+    public class FailureInjectingReplicatorForTest extends ReplicatorForTest {
+
+      public FailureInjectingReplicatorForTest(List<Entry> entries, int ordinal) {
+        super(entries, ordinal);
+      }
+
+      @Override
+      protected void replicateEntries(BlockingInterface rrs, List<Entry> entries,
+          String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
+          throws IOException {
+        super.replicateEntries(new FailureInjectingBlockingInterface(rrs), entries,
+          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+      }
+    }
+
+    @Override
+    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
+      return new FailureInjectingReplicatorForTest(entries, ordinal);
+    }
+  }
+
+}


[2/2] hbase git commit: HBASE-18027 HBaseInterClusterReplicationEndpoint should respect RPC limits when batching edits

Posted by ap...@apache.org.
HBASE-18027 HBaseInterClusterReplicationEndpoint should respect RPC limits when batching edits


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/140c559a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/140c559a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/140c559a

Branch: refs/heads/branch-1
Commit: 140c559a3a2528bc1485852bb9b5251901d58798
Parents: 1a37f3b
Author: Andrew Purtell <ap...@apache.org>
Authored: Tue May 30 14:25:36 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue May 30 14:25:36 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  |   4 +-
 .../hadoop/hbase/regionserver/wal/WALEdit.java  |   8 +
 .../HBaseInterClusterReplicationEndpoint.java   | 130 ++++--
 .../org/apache/hadoop/hbase/wal/WALKey.java     |  27 +-
 .../regionserver/TestReplicator.java            | 408 +++++++++++++++++++
 5 files changed, 531 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/140c559a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 232b0e8..5617acb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -258,7 +258,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
 
   protected HBaseRPCErrorHandler errorHandler = null;
 
-  static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
+  public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
   private static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION =
       new RequestTooBigException();
 
@@ -274,7 +274,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
   private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
 
   /** Default value for above params */
-  private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
+  public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
   private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
   private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/140c559a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index 4fd1c41..832c3fc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -292,6 +292,14 @@ public class WALEdit implements Writable, HeapSize {
     return ret;
   }
 
+  public long estimatedSerializedSizeOf() {
+    long ret = 0;
+    for (Cell cell: cells) {
+      ret += CellUtil.estimatedSerializedSizeOf(cell);
+    }
+    return ret;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();

http://git-wip-us.apache.org/repos/asf/hbase/blob/140c559a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 0acb33a..85bd11a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -23,7 +23,9 @@ import java.net.ConnectException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.ipc.RpcServer;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@@ -54,7 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.ipc.RemoteException;
-import javax.security.sasl.SaslException;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -86,6 +88,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   private int socketTimeoutMultiplier;
   // Amount of time for shutdown to wait for all tasks to complete
   private long maxTerminationWait;
+  // Size limit for replication RPCs, in bytes
+  private int replicationRpcLimit;
   //Metrics for this source
   private MetricsSource metrics;
   // Handles connecting to peer region servers
@@ -130,6 +134,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         new LinkedBlockingQueue<Runnable>());
     this.exec.allowCoreThreadTimeOut(true);
     this.abortable = ctx.getAbortable();
+    // Set the size limit for replication RPCs to 95% of the max request size.
+    // We could do with less slop if we have an accurate estimate of encoded size. Being
+    // conservative for now.
+    this.replicationRpcLimit = (int)(0.95 * (double)conf.getLong(RpcServer.MAX_REQUEST_SIZE,
+      RpcServer.DEFAULT_MAX_REQUEST_SIZE));
 
     this.replicationBulkLoadDataEnabled =
         conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
@@ -185,16 +194,46 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     return sleepMultiplier < maxRetriesMultiplier;
   }
 
+  private List<List<Entry>> createBatches(final List<Entry> entries) {
+    int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
+    int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
+    // Maintains the current batch for a given partition index
+    Map<Integer, List<Entry>> entryMap = new HashMap<>(n);
+    List<List<Entry>> entryLists = new ArrayList<>();
+    int[] sizes = new int[n];
+
+    for (int i = 0; i < n; i++) {
+      entryMap.put(i, new ArrayList<Entry>(entries.size()/n+1));
+    }
+
+    for (Entry e: entries) {
+      int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n);
+      int entrySize = (int)e.getKey().estimatedSerializedSizeOf() +
+          (int)e.getEdit().estimatedSerializedSizeOf();
+      // If this batch is oversized, add it to final list and initialize a new empty batch
+      if (sizes[index] > 0 /* must include at least one entry */ &&
+          sizes[index] + entrySize > replicationRpcLimit) {
+        entryLists.add(entryMap.get(index));
+        entryMap.put(index, new ArrayList<Entry>());
+        sizes[index] = 0;
+      }
+      entryMap.get(index).add(e);
+      sizes[index] += entrySize;
+    }
+
+    entryLists.addAll(entryMap.values());
+    return entryLists;
+  }
+
   /**
    * Do the shipping logic
    */
   @Override
   public boolean replicate(ReplicateContext replicateContext) {
     CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(this.exec);
-    List<Entry> entries = replicateContext.getEntries();
+    List<List<Entry>> batches;
     String walGroupId = replicateContext.getWalGroupId();
     int sleepMultiplier = 1;
-    int numReplicated = 0;
 
     if (!peersSelected && this.isRunning()) {
       connectToPeers();
@@ -208,22 +247,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
       return false;
     }
 
-    // minimum of: configured threads, number of 100-waledit batches,
-    //  and number of current sinks
-    int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
+    batches = createBatches(replicateContext.getEntries());
 
-    List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n);
-    if (n == 1) {
-      entryLists.add(entries);
-    } else {
-      for (int i=0; i<n; i++) {
-        entryLists.add(new ArrayList<Entry>(entries.size()/n+1));
-      }
-      // now group by region
-      for (Entry e : entries) {
-        entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
-      }
-    }
     while (this.isRunning() && !exec.isShutdown()) {
       if (!isPeerEnabled()) {
         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
@@ -232,35 +257,35 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         continue;
       }
       try {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Replicating " + entries.size() +
-              " entries of total size " + replicateContext.getSize());
-        }
-
         int futures = 0;
-        for (int i=0; i<entryLists.size(); i++) {
-          if (!entryLists.get(i).isEmpty()) {
+        for (int i=0; i<batches.size(); i++) {
+          List<Entry> entries = batches.get(i);
+          if (!entries.isEmpty()) {
             if (LOG.isTraceEnabled()) {
-              LOG.trace("Submitting " + entryLists.get(i).size() +
+              LOG.trace("Submitting " + entries.size() +
                   " entries of total size " + replicateContext.getSize());
             }
             // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
-            pool.submit(createReplicator(entryLists.get(i), i));
+            pool.submit(createReplicator(entries, i));
             futures++;
           }
         }
         IOException iox = null;
 
+        long lastWriteTime = 0;
         for (int i=0; i<futures; i++) {
           try {
             // wait for all futures, remove successful parts
             // (only the remaining parts will be retried)
             Future<Integer> f = pool.take();
             int index = f.get().intValue();
-            int batchSize =  entryLists.get(index).size();
-            entryLists.set(index, Collections.<Entry>emptyList());
-            // Now, we have marked the batch as done replicating, record its size
-            numReplicated += batchSize;
+            List<Entry> batch = batches.get(index);
+            batches.set(index, Collections.<Entry>emptyList()); // remove successful batch
+            // Find the most recent write time in the batch
+            long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
+            if (writeTime > lastWriteTime) {
+              lastWriteTime = writeTime;
+            }
           } catch (InterruptedException ie) {
             iox =  new IOException(ie);
           } catch (ExecutionException ee) {
@@ -272,15 +297,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
           // if we had any exceptions, try again
           throw iox;
         }
-        if (numReplicated != entries.size()) {
-          // Something went wrong here and we don't know what, let's just fail and retry.
-          LOG.warn("The number of edits replicated is different from the number received,"
-              + " failing for now.");
-          return false;
-        }
         // update metrics
-        this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
-          walGroupId);
+        if (lastWriteTime > 0) {
+          this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
+        }
         return true;
 
       } catch (IOException ioe) {
@@ -374,17 +394,42 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
       this.ordinal = ordinal;
     }
 
+    protected void replicateEntries(BlockingInterface rrs, final List<Entry> batch,
+        String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
+        throws IOException {
+      if (LOG.isTraceEnabled()) {
+        long size = 0;
+        for (Entry e: entries) {
+          size += e.getKey().estimatedSerializedSizeOf();
+          size += e.getEdit().estimatedSerializedSizeOf();
+        }
+        LOG.trace("Replicating batch " + System.identityHashCode(entries) + " of " +
+            entries.size() + " entries with total size " + size + " bytes to " +
+            replicationClusterId);
+      }
+      try {
+        ReplicationProtbufUtil.replicateWALEntry(rrs, batch.toArray(new Entry[batch.size()]),
+          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Completed replicating batch " + System.identityHashCode(entries));
+        }
+      } catch (IOException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace("Failed replicating batch " + System.identityHashCode(entries), e);
+        }
+        throw e;
+      }
+    }
+
     @Override
     public Integer call() throws IOException {
       SinkPeer sinkPeer = null;
       try {
         sinkPeer = replicationSinkMgr.getReplicationSink();
         BlockingInterface rrs = sinkPeer.getRegionServer();
-        ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
-          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+        replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir);
         replicationSinkMgr.reportSinkSuccess(sinkPeer);
         return ordinal;
-
       } catch (IOException ioe) {
         if (sinkPeer != null) {
           replicationSinkMgr.reportBadSink(sinkPeer);
@@ -392,6 +437,5 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         throw ioe;
       }
     }
-
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/140c559a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 01420d7..5cc7567 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -614,4 +614,29 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
     }
   }
 
-}
\ No newline at end of file
+  public long estimatedSerializedSizeOf() {
+    long size = encodedRegionName != null ? encodedRegionName.length : 0;
+    size += tablename != null ? tablename.toBytes().length : 0;
+    if (clusterIds != null) {
+      size += 16 * clusterIds.size();
+    }
+    if (nonceGroup != HConstants.NO_NONCE) {
+      size += Bytes.SIZEOF_LONG; // nonce group
+    }
+    if (nonce != HConstants.NO_NONCE) {
+      size += Bytes.SIZEOF_LONG; // nonce
+    }
+    if (scopes != null) {
+      for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
+        size += scope.getKey().length;
+        size += Bytes.SIZEOF_INT;
+      }
+    }
+    size += Bytes.SIZEOF_LONG; // sequence number
+    size += Bytes.SIZEOF_LONG; // write time
+    if (origLogSeqNum > 0) {
+      size += Bytes.SIZEOF_LONG; // original sequence number
+    }
+    return size;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/140c559a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
new file mode 100644
index 0000000..6d15a1b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.*;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+@Category(MediumTests.class)
+public class TestReplicator extends TestReplicationBase {
+
+  static final Log LOG = LogFactory.getLog(TestReplicator.class);
+  static final int NUM_ROWS = 10;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    // Set RPC size limit to 10kb (will be applied to both source and sink clusters)
+    conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
+    TestReplicationBase.setUpBeforeClass();
+    admin.removePeer("2"); // Remove the peer set up for us by base class
+  }
+
+  @Test
+  public void testReplicatorBatching() throws Exception {
+    // Clear the tables
+    truncateTable(utility1, tableName);
+    truncateTable(utility2, tableName);
+
+    // Replace the peer set up for us by the base class with a wrapper for this test
+    admin.addPeer("testReplicatorBatching",
+      new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
+        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+
+    ReplicationEndpointForTest.setBatchCount(0);
+    ReplicationEndpointForTest.setEntriesCount(0);
+    try {
+      ReplicationEndpointForTest.pause();
+      try {
+        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
+        // have to be replicated separately.
+        final byte[] valueBytes = new byte[8 *1024];
+        for (int i = 0; i < NUM_ROWS; i++) {
+          htable1.put(new Put(("row"+Integer.toString(i)).getBytes())
+            .addColumn(famName, null, valueBytes)
+          );
+        }
+      } finally {
+        ReplicationEndpointForTest.resume();
+      }
+
+      // Wait for replication to complete.
+      // We can expect 10 batches, 1 row each
+      Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS;
+        }
+
+        @Override
+        public String explainFailure() throws Exception {
+          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
+        }
+      });
+
+      assertEquals("We sent an incorrect number of batches", NUM_ROWS,
+        ReplicationEndpointForTest.getBatchCount());
+      assertEquals("We did not replicate enough rows", NUM_ROWS,
+        utility2.countRows(htable2));
+    } finally {
+      admin.removePeer("testReplicatorBatching");
+    }
+  }
+
+  @Test
+  public void testReplicatorWithErrors() throws Exception {
+    // Clear the tables
+    truncateTable(utility1, tableName);
+    truncateTable(utility2, tableName);
+
+    // Replace the peer set up for us by the base class with a wrapper for this test
+    admin.addPeer("testReplicatorWithErrors",
+      new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
+          .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()),
+        null);
+
+    FailureInjectingReplicationEndpointForTest.setBatchCount(0);
+    FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
+    try {
+      FailureInjectingReplicationEndpointForTest.pause();
+      try {
+        // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
+        // have to be replicated separately.
+        final byte[] valueBytes = new byte[8 *1024];
+        for (int i = 0; i < NUM_ROWS; i++) {
+          htable1.put(new Put(("row"+Integer.toString(i)).getBytes())
+            .addColumn(famName, null, valueBytes)
+          );
+        }
+      } finally {
+        FailureInjectingReplicationEndpointForTest.resume();
+      }
+
+      // Wait for replication to complete.
+      // We can expect 10 batches
+      Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS;
+        }
+
+        @Override
+        public String explainFailure() throws Exception {
+          return "We waited too long for expected replication of " + NUM_ROWS + " entries";
+        }
+      });
+
+      assertEquals("We did not replicate enough rows", NUM_ROWS,
+        utility2.countRows(htable2));
+    } finally {
+      admin.removePeer("testReplicatorWithErrors");
+    }
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TestReplicationBase.tearDownAfterClass();
+  }
+
+  private void truncateTable(HBaseTestingUtility util, TableName tablename) throws IOException {
+    HBaseAdmin admin = util.getHBaseAdmin();
+    admin.disableTable(tableName);
+    admin.truncateTable(tablename, false);
+  }
+
+  public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint {
+
+    private static int batchCount;
+    private static int entriesCount;
+    private static final Object latch = new Object();
+    private static AtomicBoolean useLatch = new AtomicBoolean(false);
+
+    public static void resume() {
+      useLatch.set(false);
+      synchronized (latch) {
+        latch.notifyAll();
+      }
+    }
+
+    public static void pause() {
+      useLatch.set(true);
+    }
+
+    public static void await() throws InterruptedException {
+      if (useLatch.get()) {
+        LOG.info("Waiting on latch");
+        latch.wait();
+        LOG.info("Waited on latch, now proceeding");
+      }
+    }
+
+    public static int getBatchCount() {
+      return batchCount;
+    }
+
+    public static void setBatchCount(int i) {
+      batchCount = i;
+    }
+
+    public static int getEntriesCount() {
+      return entriesCount;
+    }
+
+    public static void setEntriesCount(int i) {
+      entriesCount = i;
+    }
+
+    public class ReplicatorForTest extends Replicator {
+
+      public ReplicatorForTest(List<Entry> entries, int ordinal) {
+        super(entries, ordinal);
+      }
+
+      @Override
+      protected void replicateEntries(BlockingInterface rrs, final List<Entry> entries,
+          String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
+          throws IOException {
+        try {
+          long size = 0;
+          for (Entry e: entries) {
+            size += e.getKey().estimatedSerializedSizeOf();
+            size += e.getEdit().estimatedSerializedSizeOf();
+          }
+          LOG.info("Replicating batch " + System.identityHashCode(entries) + " of " +
+              entries.size() + " entries with total size " + size + " bytes to " +
+              replicationClusterId);
+          super.replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir,
+            hfileArchiveDir);
+          entriesCount += entries.size();
+          batchCount++;
+          LOG.info("Completed replicating batch " + System.identityHashCode(entries));
+        } catch (IOException e) {
+          LOG.info("Failed to replicate batch " + System.identityHashCode(entries), e);
+          throw e;
+        }
+      }
+    }
+
+    @Override
+    public boolean replicate(ReplicateContext replicateContext) {
+      try {
+        await();
+      } catch (InterruptedException e) {
+        LOG.warn("Interrupted waiting for latch", e);
+      }
+      return super.replicate(replicateContext);
+    }
+
+    @Override
+    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
+      return new ReplicatorForTest(entries, ordinal);
+    }
+  }
+
+  public static class FailureInjectingReplicationEndpointForTest
+      extends ReplicationEndpointForTest {
+
+    static class FailureInjectingBlockingInterface implements BlockingInterface {
+
+      private final BlockingInterface delegate;
+      private volatile boolean failNext;
+
+      public FailureInjectingBlockingInterface(BlockingInterface delegate) {
+        this.delegate = delegate;
+      }
+
+      @Override
+      public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
+          ReplicateWALEntryRequest request) throws ServiceException {
+        if (!failNext) {
+          failNext = true;
+          return delegate.replicateWALEntry(controller, request);
+        } else {
+          failNext = false;
+          throw new ServiceException("Injected failure");
+        }
+      }
+
+      @Override
+      public GetRegionInfoResponse getRegionInfo(RpcController controller,
+          GetRegionInfoRequest request) throws ServiceException {
+        return delegate.getRegionInfo(controller, request);
+      }
+
+      @Override
+      public GetStoreFileResponse getStoreFile(RpcController controller,
+          GetStoreFileRequest request) throws ServiceException {
+        return delegate.getStoreFile(controller, request);
+      }
+
+      @Override
+      public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
+          GetOnlineRegionRequest request) throws ServiceException {
+        return delegate.getOnlineRegion(controller, request);
+      }
+
+      @Override
+      public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
+          throws ServiceException {
+        return delegate.openRegion(controller, request);
+      }
+
+      @Override
+      public WarmupRegionResponse warmupRegion(RpcController controller,
+          WarmupRegionRequest request) throws ServiceException {
+        return delegate.warmupRegion(controller, request);
+      }
+
+      @Override
+      public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
+          throws ServiceException {
+        return delegate.closeRegion(controller, request);
+      }
+
+      @Override
+      public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
+          throws ServiceException {
+        return delegate.flushRegion(controller, request);
+      }
+
+      @Override
+      public SplitRegionResponse splitRegion(RpcController controller, SplitRegionRequest request)
+          throws ServiceException {
+        return delegate.splitRegion(controller, request);
+      }
+
+      @Override
+      public CompactRegionResponse compactRegion(RpcController controller,
+          CompactRegionRequest request) throws ServiceException {
+        return delegate.compactRegion(controller, request);
+      }
+
+      @Override
+      public MergeRegionsResponse mergeRegions(RpcController controller,
+          MergeRegionsRequest request) throws ServiceException {
+        return delegate.mergeRegions(controller, request);
+      }
+
+      @Override
+      public ReplicateWALEntryResponse replay(RpcController controller,
+          ReplicateWALEntryRequest request) throws ServiceException {
+        return delegate.replay(controller, request);
+      }
+
+      @Override
+      public RollWALWriterResponse rollWALWriter(RpcController controller,
+          RollWALWriterRequest request) throws ServiceException {
+        return delegate.rollWALWriter(controller, request);
+      }
+
+      @Override
+      public GetServerInfoResponse getServerInfo(RpcController controller,
+          GetServerInfoRequest request) throws ServiceException {
+        return delegate.getServerInfo(controller, request);
+      }
+
+      @Override
+      public StopServerResponse stopServer(RpcController controller, StopServerRequest request)
+          throws ServiceException {
+        return delegate.stopServer(controller, request);
+      }
+
+      @Override
+      public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
+          UpdateFavoredNodesRequest request) throws ServiceException {
+        return delegate.updateFavoredNodes(controller, request);
+      }
+
+      @Override
+      public UpdateConfigurationResponse updateConfiguration(RpcController controller,
+          UpdateConfigurationRequest request) throws ServiceException {
+        return delegate.updateConfiguration(controller, request);
+      }
+    }
+
+    public class FailureInjectingReplicatorForTest extends ReplicatorForTest {
+
+      public FailureInjectingReplicatorForTest(List<Entry> entries, int ordinal) {
+        super(entries, ordinal);
+      }
+
+      @Override
+      protected void replicateEntries(BlockingInterface rrs, List<Entry> entries,
+          String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
+          throws IOException {
+        super.replicateEntries(new FailureInjectingBlockingInterface(rrs), entries,
+          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+      }
+    }
+
+    @Override
+    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
+      return new FailureInjectingReplicatorForTest(entries, ordinal);
+    }
+  }
+
+}