You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2018/05/04 07:25:54 UTC

hbase git commit: HBASE-20481 Replicate entries from same region serially in ReplicationEndpoint for serial replication

Repository: hbase
Updated Branches:
  refs/heads/master 9b9f85147 -> 6225b4a49


HBASE-20481 Replicate entries from same region serially in ReplicationEndpoint for serial replication


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/6225b4a4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/6225b4a4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/6225b4a4

Branch: refs/heads/master
Commit: 6225b4a492c40a03475b666915b96984b25b3c47
Parents: 9b9f851
Author: huzheng <op...@gmail.com>
Authored: Wed May 2 10:44:42 2018 +0800
Committer: huzheng <op...@gmail.com>
Committed: Fri May 4 15:22:02 2018 +0800

----------------------------------------------------------------------
 .../hbase/replication/ReplicationEndpoint.java  |   2 +-
 .../HBaseInterClusterReplicationEndpoint.java   | 281 +++++++++---------
 .../TestReplicationAdminWithClusters.java       |   1 -
 .../replication/TestReplicationEndpoint.java    |  36 +--
 .../regionserver/TestReplicator.java            | 288 +++----------------
 .../TestSerialReplicationEndpoint.java          | 188 ++++++++++++
 6 files changed, 384 insertions(+), 412 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/6225b4a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index 543dc2f..f4c37b1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -117,7 +117,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
   /**
    * Initialize the replication endpoint with the given context.
    * @param context replication context
-   * @throws IOException
+   * @throws IOException error occur when initialize the endpoint.
    */
   void init(Context context) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/6225b4a4/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index fd3c671..7db53aa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -24,9 +24,9 @@ import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
@@ -37,6 +37,9 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -108,6 +111,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   private boolean replicationBulkLoadDataEnabled;
   private Abortable abortable;
   private boolean dropOnDeletedTables;
+  private boolean isSerial = false;
 
   @Override
   public void init(Context context) throws IOException {
@@ -160,6 +164,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     Path baseNSDir = new Path(HConstants.BASE_NAMESPACE_DIR);
     baseNamespaceDir = new Path(rootDir, baseNSDir);
     hfileArchiveDir = new Path(rootDir, new Path(HConstants.HFILE_ARCHIVE_DIRECTORY, baseNSDir));
+    isSerial = context.getPeerConfig().isSerial();
   }
 
   private void decorateConf() {
@@ -203,40 +208,60 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     return sleepMultiplier < maxRetriesMultiplier;
   }
 
-  private List<List<Entry>> createBatches(final List<Entry> entries) {
+  private int getEstimatedEntrySize(Entry e) {
+    long size = e.getKey().estimatedSerializedSizeOf() + e.getEdit().estimatedSerializedSizeOf();
+    return (int) size;
+  }
+
+  private List<List<Entry>> createParallelBatches(final List<Entry> entries) {
     int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
-    int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
-    // Maintains the current batch for a given partition index
-    Map<Integer, List<Entry>> entryMap = new HashMap<>(n);
-    List<List<Entry>> entryLists = new ArrayList<>();
+    int n = Math.min(Math.min(this.maxThreads, entries.size() / 100 + 1), numSinks);
+    List<List<Entry>> entryLists =
+        Stream.generate(ArrayList<Entry>::new).limit(n).collect(Collectors.toList());
     int[] sizes = new int[n];
-
-    for (int i = 0; i < n; i++) {
-      entryMap.put(i, new ArrayList<Entry>(entries.size()/n+1));
-    }
-
-    for (Entry e: entries) {
-      int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n);
-      int entrySize = (int)e.getKey().estimatedSerializedSizeOf() +
-          (int)e.getEdit().estimatedSerializedSizeOf();
-      // If this batch is oversized, add it to final list and initialize a new empty batch
-      if (sizes[index] > 0 /* must include at least one entry */ &&
-          sizes[index] + entrySize > replicationRpcLimit) {
-        entryLists.add(entryMap.get(index));
-        entryMap.put(index, new ArrayList<Entry>());
+    for (Entry e : entries) {
+      int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName()) % n);
+      int entrySize = getEstimatedEntrySize(e);
+      // If this batch has at least one entry and is over sized, move it to the tail of list and
+      // initialize the entryLists[index] to be a empty list.
+      if (sizes[index] > 0 && sizes[index] + entrySize > replicationRpcLimit) {
+        entryLists.add(entryLists.get(index));
+        entryLists.set(index, new ArrayList<>());
         sizes[index] = 0;
       }
-      entryMap.get(index).add(e);
+      entryLists.get(index).add(e);
       sizes[index] += entrySize;
     }
-
-    entryLists.addAll(entryMap.values());
     return entryLists;
   }
 
+  private List<List<Entry>> createSerialBatches(final List<Entry> entries) {
+    Map<byte[], List<Entry>> regionEntries = new TreeMap<>(Bytes.BYTES_COMPARATOR);
+    for (Entry e : entries) {
+      regionEntries.computeIfAbsent(e.getKey().getEncodedRegionName(), key -> new ArrayList<>())
+          .add(e);
+    }
+    return new ArrayList<>(regionEntries.values());
+  }
+
+  /**
+   * Divide the entries into multiple batches, so that we can replicate each batch in a thread pool
+   * concurrently. Note that, for serial replication, we need to make sure that entries from the
+   * same region to be replicated serially, so entries from the same region consist of a batch, and
+   * we will divide a batch into several batches by replicationRpcLimit in method
+   * serialReplicateRegionEntries()
+   */
+  private List<List<Entry>> createBatches(final List<Entry> entries) {
+    if (isSerial) {
+      return createSerialBatches(entries);
+    } else {
+      return createParallelBatches(entries);
+    }
+  }
+
   private TableName parseTable(String msg) {
     // ... TableNotFoundException: '<table>'/n...
-    Pattern p = Pattern.compile("TableNotFoundException: \\'([\\S]*)\\'");
+    Pattern p = Pattern.compile("TableNotFoundException: '([\\S]*)'");
     Matcher m = p.matcher(msg);
     if (m.find()) {
       String table = m.group(1);
@@ -252,17 +277,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
 
   // Filter a set of batches by TableName
   private List<List<Entry>> filterBatches(final List<List<Entry>> oldEntryList, TableName table) {
-    List<List<Entry>> entryLists = new ArrayList<>();
-    for (List<Entry> entries : oldEntryList) {
-      ArrayList<Entry> thisList = new ArrayList<Entry>(entries.size());
-      entryLists.add(thisList);
-      for (Entry e : entries) {
-        if (!e.getKey().getTableName().equals(table)) {
-          thisList.add(e);
-        }
-      }
-    }
-    return entryLists;
+    return oldEntryList
+        .stream().map(entries -> entries.stream()
+            .filter(e -> !e.getKey().getTableName().equals(table)).collect(Collectors.toList()))
+        .collect(Collectors.toList());
   }
 
   private void reconnectToPeerCluster() {
@@ -277,13 +295,55 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     }
   }
 
+  private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
+      List<List<Entry>> batches) throws IOException {
+    int futures = 0;
+    for (int i = 0; i < batches.size(); i++) {
+      List<Entry> entries = batches.get(i);
+      if (!entries.isEmpty()) {
+        LOG.trace("Submitting {} entries of total size {}", entries.size(),
+          replicateContext.getSize());
+        // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
+        pool.submit(createReplicator(entries, i));
+        futures++;
+      }
+    }
+
+    IOException iox = null;
+    long lastWriteTime = 0;
+    for (int i = 0; i < futures; i++) {
+      try {
+        // wait for all futures, remove successful parts
+        // (only the remaining parts will be retried)
+        Future<Integer> f = pool.take();
+        int index = f.get();
+        List<Entry> batch = batches.get(index);
+        batches.set(index, Collections.emptyList()); // remove successful batch
+        // Find the most recent write time in the batch
+        long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
+        if (writeTime > lastWriteTime) {
+          lastWriteTime = writeTime;
+        }
+      } catch (InterruptedException ie) {
+        iox = new IOException(ie);
+      } catch (ExecutionException ee) {
+        // cause must be an IOException
+        iox = (IOException) ee.getCause();
+      }
+    }
+    if (iox != null) {
+      // if we had any exceptions, try again
+      throw iox;
+    }
+    return lastWriteTime;
+  }
+
   /**
    * Do the shipping logic
    */
   @Override
   public boolean replicate(ReplicateContext replicateContext) {
     CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
-    List<List<Entry>> batches;
     String walGroupId = replicateContext.getWalGroupId();
     int sleepMultiplier = 1;
 
@@ -294,13 +354,12 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
 
     int numSinks = replicationSinkMgr.getNumSinks();
     if (numSinks == 0) {
-      LOG.warn("No replication sinks found, returning without replicating. The source should retry"
-          + " with the same set of edits.");
+      LOG.warn("No replication sinks found, returning without replicating. The source should " +
+          "retry with the same set of edits.");
       return false;
     }
 
-    batches = createBatches(replicateContext.getEntries());
-
+    List<List<Entry>> batches = createBatches(replicateContext.getEntries());
     while (this.isRunning() && !exec.isShutdown()) {
       if (!isPeerEnabled()) {
         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
@@ -312,52 +371,16 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         reconnectToPeerCluster();
       }
       try {
-        int futures = 0;
-        for (int i=0; i<batches.size(); i++) {
-          List<Entry> entries = batches.get(i);
-          if (!entries.isEmpty()) {
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("Submitting " + entries.size() +
-                  " entries of total size " + replicateContext.getSize());
-            }
-            // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
-            pool.submit(createReplicator(entries, i));
-            futures++;
-          }
-        }
-        IOException iox = null;
-
-        long lastWriteTime = 0;
-        for (int i=0; i<futures; i++) {
-          try {
-            // wait for all futures, remove successful parts
-            // (only the remaining parts will be retried)
-            Future<Integer> f = pool.take();
-            int index = f.get().intValue();
-            List<Entry> batch = batches.get(index);
-            batches.set(index, Collections.<Entry>emptyList()); // remove successful batch
-            // Find the most recent write time in the batch
-            long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
-            if (writeTime > lastWriteTime) {
-              lastWriteTime = writeTime;
-            }
-          } catch (InterruptedException ie) {
-            iox =  new IOException(ie);
-          } catch (ExecutionException ee) {
-            // cause must be an IOException
-            iox = (IOException)ee.getCause();
-          }
-        }
-        if (iox != null) {
-          // if we had any exceptions, try again
-          throw iox;
-        }
+        long lastWriteTime;
+
+        // replicate the batches to sink side.
+        lastWriteTime = parallelReplicate(pool, replicateContext, batches);
+
         // update metrics
         if (lastWriteTime > 0) {
           this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
         }
         return true;
-
       } catch (IOException ioe) {
         // Didn't ship anything, but must still age the last time we did
         this.metrics.refreshAgeOfLastShippedOp(walGroupId);
@@ -376,7 +399,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
                     // Would potentially be better to retry in one of the outer loops
                     // and add a table filter there; but that would break the encapsulation,
                     // so we're doing the filtering here.
-                    LOG.info("Missing table detected at sink, local table also does not exist, filtering edits for '"+table+"'");
+                    LOG.info("Missing table detected at sink, local table also does not exist, " +
+                        "filtering edits for '" + table + "'");
                     batches = filterBatches(batches, table);
                     continue;
                   }
@@ -396,8 +420,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
             // happened, the cluster is alive and calling it right away
             // even for a test just makes things worse.
             sleepForRetries("Encountered a SocketTimeoutException. Since the " +
-              "call to the remote cluster timed out, which is usually " +
-              "caused by a machine failure or a massive slowdown",
+                  "call to the remote cluster timed out, which is usually " +
+                  "caused by a machine failure or a massive slowdown",
               this.socketTimeoutMultiplier);
           } else if (ioe instanceof ConnectException || ioe instanceof UnknownHostException) {
             LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
@@ -420,7 +444,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
 
   @Override
   protected void doStop() {
-    disconnect(); //don't call super.doStop()
+    disconnect(); // don't call super.doStop()
     if (this.conn != null) {
       try {
         this.conn.close();
@@ -446,61 +470,58 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   }
 
   @VisibleForTesting
-  protected Replicator createReplicator(List<Entry> entries, int ordinal) {
-    return new Replicator(entries, ordinal);
-  }
-
-  @VisibleForTesting
-  protected class Replicator implements Callable<Integer> {
-    private List<Entry> entries;
-    private int ordinal;
-    public Replicator(List<Entry> entries, int ordinal) {
-      this.entries = entries;
-      this.ordinal = ordinal;
-    }
-
-    protected void replicateEntries(BlockingInterface rrs, final List<Entry> batch,
-        String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
-        throws IOException {
+  protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOException {
+    SinkPeer sinkPeer = null;
+    try {
+      int entriesHashCode = System.identityHashCode(entries);
       if (LOG.isTraceEnabled()) {
-        long size = 0;
-        for (Entry e: entries) {
-          size += e.getKey().estimatedSerializedSizeOf();
-          size += e.getEdit().estimatedSerializedSizeOf();
-        }
-        LOG.trace("Replicating batch " + System.identityHashCode(entries) + " of " +
-            entries.size() + " entries with total size " + size + " bytes to " +
-            replicationClusterId);
+        long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
+        LOG.trace("Replicating batch {} of {} entries with total size {} bytes to {}",
+          entriesHashCode, entries.size(), size, replicationClusterId);
       }
+      sinkPeer = replicationSinkMgr.getReplicationSink();
+      BlockingInterface rrs = sinkPeer.getRegionServer();
       try {
-        ReplicationProtbufUtil.replicateWALEntry(rrs, batch.toArray(new Entry[batch.size()]),
+        ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
           replicationClusterId, baseNamespaceDir, hfileArchiveDir);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Completed replicating batch " + System.identityHashCode(entries));
-        }
+        LOG.trace("Completed replicating batch {}", entriesHashCode);
       } catch (IOException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Failed replicating batch " + System.identityHashCode(entries), e);
-        }
+        LOG.trace("Failed replicating batch {}", entriesHashCode, e);
         throw e;
       }
+      replicationSinkMgr.reportSinkSuccess(sinkPeer);
+    } catch (IOException ioe) {
+      if (sinkPeer != null) {
+        replicationSinkMgr.reportBadSink(sinkPeer);
+      }
+      throw ioe;
     }
+    return batchIndex;
+  }
 
-    @Override
-    public Integer call() throws IOException {
-      SinkPeer sinkPeer = null;
-      try {
-        sinkPeer = replicationSinkMgr.getReplicationSink();
-        BlockingInterface rrs = sinkPeer.getRegionServer();
-        replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir);
-        replicationSinkMgr.reportSinkSuccess(sinkPeer);
-        return ordinal;
-      } catch (IOException ioe) {
-        if (sinkPeer != null) {
-          replicationSinkMgr.reportBadSink(sinkPeer);
-        }
-        throw ioe;
+  private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex)
+      throws IOException {
+    int batchSize = 0, index = 0;
+    List<Entry> batch = new ArrayList<>();
+    for (Entry entry : entries) {
+      int entrySize = getEstimatedEntrySize(entry);
+      if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
+        replicateEntries(batch, index++);
+        batch.clear();
+        batchSize = 0;
       }
+      batch.add(entry);
+      batchSize += entrySize;
+    }
+    if (batchSize > 0) {
+      replicateEntries(batch, index);
     }
+    return batchIndex;
+  }
+
+  @VisibleForTesting
+  protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex) {
+    return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex)
+        : () -> replicateEntries(entries, batchIndex);
   }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6225b4a4/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
index 7be8c16..268fe00 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdminWithClusters.java
@@ -297,7 +297,6 @@ public class TestReplicationAdminWithClusters extends TestReplicationBase {
       notifyStopped();
     }
 
-
     @Override
     public UUID getPeerUUID() {
       return UUID.randomUUID();

http://git-wip-us.apache.org/repos/asf/hbase/blob/6225b4a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 3fca0ec..a3c20d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.UUID;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -447,40 +448,15 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     }
 
     @Override
-    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
+    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
       // Fail only once, we don't want to slow down the test.
       if (failedOnce) {
-        return new DummyReplicator(entries, ordinal);
+        return () -> ordinal;
       } else {
         failedOnce = true;
-        return new FailingDummyReplicator(entries, ordinal);
-      }
-    }
-
-    protected class DummyReplicator extends Replicator {
-
-      private int ordinal;
-
-      public DummyReplicator(List<Entry> entries, int ordinal) {
-        super(entries, ordinal);
-        this.ordinal = ordinal;
-      }
-
-      @Override
-      public Integer call() throws IOException {
-        return ordinal;
-      }
-    }
-
-    protected class FailingDummyReplicator extends DummyReplicator {
-
-      public FailingDummyReplicator(List<Entry> entries, int ordinal) {
-        super(entries, ordinal);
-      }
-
-      @Override
-      public Integer call() throws IOException {
-        throw new IOException("Sample Exception: Failed to replicate.");
+        return () -> {
+          throw new IOException("Sample Exception: Failed to replicate.");
+        };
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6225b4a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
index d8db3b1..24329a0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
@@ -21,9 +21,9 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.TableName;
@@ -39,57 +39,14 @@ import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearCompactionQueuesResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ClearRegionBlockCacheResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CloseRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.CompactRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionLoadResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetServerInfoResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
-
 @Category(MediumTests.class)
-@Ignore("Flaky, needs to be rewritten, see HBASE-19125")
 public class TestReplicator extends TestReplicationBase {
 
   @ClassRule
@@ -104,7 +61,6 @@ public class TestReplicator extends TestReplicationBase {
     // Set RPC size limit to 10kb (will be applied to both source and sink clusters)
     conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
     TestReplicationBase.setUpBeforeClass();
-    admin.removePeer("2"); // Remove the peer set up for us by base class
   }
 
   @Test
@@ -116,7 +72,8 @@ public class TestReplicator extends TestReplicationBase {
     // Replace the peer set up for us by the base class with a wrapper for this test
     admin.addPeer("testReplicatorBatching",
       new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
-        .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+          .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()),
+      null);
 
     ReplicationEndpointForTest.setBatchCount(0);
     ReplicationEndpointForTest.setEntriesCount(0);
@@ -125,11 +82,10 @@ public class TestReplicator extends TestReplicationBase {
       try {
         // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
         // have to be replicated separately.
-        final byte[] valueBytes = new byte[8 *1024];
+        final byte[] valueBytes = new byte[8 * 1024];
         for (int i = 0; i < NUM_ROWS; i++) {
-          htable1.put(new Put(Bytes.toBytes("row"+Integer.toString(i)))
-            .addColumn(famName, null, valueBytes)
-          );
+          htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null,
+            valueBytes));
         }
       } finally {
         ReplicationEndpointForTest.resume();
@@ -151,8 +107,7 @@ public class TestReplicator extends TestReplicationBase {
 
       assertEquals("We sent an incorrect number of batches", NUM_ROWS,
         ReplicationEndpointForTest.getBatchCount());
-      assertEquals("We did not replicate enough rows", NUM_ROWS,
-        utility2.countRows(htable2));
+      assertEquals("We did not replicate enough rows", NUM_ROWS, utility2.countRows(htable2));
     } finally {
       admin.removePeer("testReplicatorBatching");
     }
@@ -168,7 +123,7 @@ public class TestReplicator extends TestReplicationBase {
     admin.addPeer("testReplicatorWithErrors",
       new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
           .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()),
-        null);
+      null);
 
     FailureInjectingReplicationEndpointForTest.setBatchCount(0);
     FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
@@ -177,11 +132,10 @@ public class TestReplicator extends TestReplicationBase {
       try {
         // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
         // have to be replicated separately.
-        final byte[] valueBytes = new byte[8 *1024];
+        final byte[] valueBytes = new byte[8 * 1024];
         for (int i = 0; i < NUM_ROWS; i++) {
-          htable1.put(new Put(Bytes.toBytes("row"+Integer.toString(i)))
-            .addColumn(famName, null, valueBytes)
-          );
+          htable1.put(new Put(Bytes.toBytes("row" + Integer.toString(i))).addColumn(famName, null,
+            valueBytes));
         }
       } finally {
         FailureInjectingReplicationEndpointForTest.resume();
@@ -201,8 +155,7 @@ public class TestReplicator extends TestReplicationBase {
         }
       });
 
-      assertEquals("We did not replicate enough rows", NUM_ROWS,
-        utility2.countRows(htable2));
+      assertEquals("We did not replicate enough rows", NUM_ROWS, utility2.countRows(htable2));
     } finally {
       admin.removePeer("testReplicatorWithErrors");
     }
@@ -221,8 +174,8 @@ public class TestReplicator extends TestReplicationBase {
 
   public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint {
 
-    private static AtomicInteger batchCount = new AtomicInteger(0);
-    private static int entriesCount;
+    protected static AtomicInteger batchCount = new AtomicInteger(0);
+    protected static int entriesCount;
     private static final Object latch = new Object();
     private static AtomicBoolean useLatch = new AtomicBoolean(false);
 
@@ -240,7 +193,7 @@ public class TestReplicator extends TestReplicationBase {
     public static void await() throws InterruptedException {
       if (useLatch.get()) {
         LOG.info("Waiting on latch");
-        synchronized(latch) {
+        synchronized (latch) {
           latch.wait();
         }
         LOG.info("Waited on latch, now proceeding");
@@ -265,38 +218,6 @@ public class TestReplicator extends TestReplicationBase {
       entriesCount = i;
     }
 
-    public class ReplicatorForTest extends Replicator {
-
-      public ReplicatorForTest(List<Entry> entries, int ordinal) {
-        super(entries, ordinal);
-      }
-
-      @Override
-      protected void replicateEntries(BlockingInterface rrs, final List<Entry> entries,
-          String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
-          throws IOException {
-        try {
-          long size = 0;
-          for (Entry e: entries) {
-            size += e.getKey().estimatedSerializedSizeOf();
-            size += e.getEdit().estimatedSerializedSizeOf();
-          }
-          LOG.info("Replicating batch " + System.identityHashCode(entries) + " of " +
-              entries.size() + " entries with total size " + size + " bytes to " +
-              replicationClusterId);
-          super.replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir,
-            hfileArchiveDir);
-          entriesCount += entries.size();
-          int count = batchCount.incrementAndGet();
-          LOG.info("Completed replicating batch " + System.identityHashCode(entries) +
-              " count=" + count);
-        } catch (IOException e) {
-          LOG.info("Failed to replicate batch " + System.identityHashCode(entries), e);
-          throw e;
-        }
-      }
-    }
-
     @Override
     public boolean replicate(ReplicateContext replicateContext) {
       try {
@@ -308,170 +229,37 @@ public class TestReplicator extends TestReplicationBase {
     }
 
     @Override
-    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
-      return new ReplicatorForTest(entries, ordinal);
+    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
+      return () -> {
+        int batchIndex = replicateEntries(entries, ordinal);
+        entriesCount += entries.size();
+        int count = batchCount.incrementAndGet();
+        LOG.info(
+          "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
+        return batchIndex;
+      };
     }
   }
 
   public static class FailureInjectingReplicationEndpointForTest
       extends ReplicationEndpointForTest {
+    private final AtomicBoolean failNext = new AtomicBoolean(false);
 
-    static class FailureInjectingBlockingInterface implements BlockingInterface {
-
-      private final BlockingInterface delegate;
-      private volatile boolean failNext;
-
-      public FailureInjectingBlockingInterface(BlockingInterface delegate) {
-        this.delegate = delegate;
-      }
-
-      @Override
-      public GetRegionInfoResponse getRegionInfo(RpcController controller,
-          GetRegionInfoRequest request) throws ServiceException {
-        return delegate.getRegionInfo(controller, request);
-      }
-
-      @Override
-      public GetStoreFileResponse getStoreFile(RpcController controller,
-          GetStoreFileRequest request) throws ServiceException {
-        return delegate.getStoreFile(controller, request);
-      }
-
-      @Override
-      public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
-          GetOnlineRegionRequest request) throws ServiceException {
-        return delegate.getOnlineRegion(controller, request);
-      }
-
-      @Override
-      public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
-          throws ServiceException {
-        return delegate.openRegion(controller, request);
-      }
-
-      @Override
-      public WarmupRegionResponse warmupRegion(RpcController controller,
-          WarmupRegionRequest request) throws ServiceException {
-        return delegate.warmupRegion(controller, request);
-      }
-
-      @Override
-      public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
-          throws ServiceException {
-        return delegate.closeRegion(controller, request);
-      }
-
-      @Override
-      public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
-          throws ServiceException {
-        return delegate.flushRegion(controller, request);
-      }
-
-      @Override
-      public CompactRegionResponse compactRegion(RpcController controller,
-          CompactRegionRequest request) throws ServiceException {
-        return delegate.compactRegion(controller, request);
-      }
-
-      @Override
-      public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
-          ReplicateWALEntryRequest request) throws ServiceException {
-        if (!failNext) {
-          failNext = true;
-          return delegate.replicateWALEntry(controller, request);
-        } else {
-          failNext = false;
+    @Override
+    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
+      return () -> {
+        if (failNext.compareAndSet(false, true)) {
+          int batchIndex = replicateEntries(entries, ordinal);
+          entriesCount += entries.size();
+          int count = batchCount.incrementAndGet();
+          LOG.info(
+            "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
+          return batchIndex;
+        } else if (failNext.compareAndSet(true, false)) {
           throw new ServiceException("Injected failure");
         }
-      }
-
-      @Override
-      public ReplicateWALEntryResponse replay(RpcController controller,
-          ReplicateWALEntryRequest request) throws ServiceException {
-        return delegate.replay(controller, request);
-      }
-
-      @Override
-      public RollWALWriterResponse rollWALWriter(RpcController controller,
-          RollWALWriterRequest request) throws ServiceException {
-        return delegate.rollWALWriter(controller, request);
-      }
-
-      @Override
-      public GetServerInfoResponse getServerInfo(RpcController controller,
-          GetServerInfoRequest request) throws ServiceException {
-        return delegate.getServerInfo(controller, request);
-      }
-
-      @Override
-      public StopServerResponse stopServer(RpcController controller, StopServerRequest request)
-          throws ServiceException {
-        return delegate.stopServer(controller, request);
-      }
-
-      @Override
-      public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
-          UpdateFavoredNodesRequest request) throws ServiceException {
-        return delegate.updateFavoredNodes(controller, request);
-      }
-
-      @Override
-      public UpdateConfigurationResponse updateConfiguration(RpcController controller,
-          UpdateConfigurationRequest request) throws ServiceException {
-        return delegate.updateConfiguration(controller, request);
-      }
-
-      @Override
-      public GetRegionLoadResponse getRegionLoad(RpcController controller,
-          GetRegionLoadRequest request) throws ServiceException {
-        return delegate.getRegionLoad(controller, request);
-      }
-
-      @Override
-      public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
-          ClearCompactionQueuesRequest request) throws ServiceException {
-        return delegate.clearCompactionQueues(controller, request);
-      }
-
-      @Override
-      public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
-          GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
-        return delegate.getSpaceQuotaSnapshots(controller, request);
-      }
-
-      @Override
-      public ExecuteProceduresResponse executeProcedures(RpcController controller,
-                                                         ExecuteProceduresRequest request)
-      throws ServiceException {
-        return null;
-      }
-
-      @Override
-      public ClearRegionBlockCacheResponse clearRegionBlockCache(RpcController controller,
-          ClearRegionBlockCacheRequest request) throws ServiceException {
-        return delegate.clearRegionBlockCache(controller, request);
-      }
-    }
-
-    public class FailureInjectingReplicatorForTest extends ReplicatorForTest {
-
-      public FailureInjectingReplicatorForTest(List<Entry> entries, int ordinal) {
-        super(entries, ordinal);
-      }
-
-      @Override
-      protected void replicateEntries(BlockingInterface rrs, List<Entry> entries,
-          String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
-          throws IOException {
-        super.replicateEntries(new FailureInjectingBlockingInterface(rrs), entries,
-          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
-      }
-    }
-
-    @Override
-    protected Replicator createReplicator(List<Entry> entries, int ordinal) {
-      return new FailureInjectingReplicatorForTest(entries, ordinal);
+        return ordinal;
+      };
     }
   }
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/6225b4a4/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
new file mode 100644
index 0000000..7d59d38
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestSerialReplicationEndpoint {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestSerialReplicationEndpoint.class);
+
+  private static HBaseTestingUtility UTIL = new HBaseTestingUtility();
+  private static Configuration CONF;
+  private static Connection CONN;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.startMiniCluster();
+    CONF = UTIL.getConfiguration();
+    CONF.setLong(RpcServer.MAX_REQUEST_SIZE, 102400);
+    CONN = UTIL.getConnection();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    IOUtils.closeQuietly(CONN);
+    UTIL.shutdownMiniCluster();
+  }
+
+  private String getZKClusterKey() {
+    return String.format("127.0.0.1:%d:%s", UTIL.getZkCluster().getClientPort(),
+      CONF.get(HConstants.ZOOKEEPER_ZNODE_PARENT));
+  }
+
+  private void testHBaseReplicationEndpoint(String tableNameStr, String peerId, boolean isSerial)
+      throws IOException {
+    TestEndpoint.reset();
+    int cellNum = 10000;
+
+    TableName tableName = TableName.valueOf(tableNameStr);
+    byte[] family = Bytes.toBytes("f");
+    byte[] qualifier = Bytes.toBytes("q");
+    TableDescriptor td =
+        TableDescriptorBuilder.newBuilder(tableName).setColumnFamily(ColumnFamilyDescriptorBuilder
+            .newBuilder(family).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build();
+    UTIL.createTable(td, null);
+
+    try (Admin admin = CONN.getAdmin()) {
+      ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
+          .setClusterKey(getZKClusterKey()).setReplicationEndpointImpl(TestEndpoint.class.getName())
+          .setReplicateAllUserTables(false).setSerial(isSerial)
+          .setTableCFsMap(ImmutableMap.of(tableName, ImmutableList.of())).build();
+      admin.addReplicationPeer(peerId, peerConfig);
+    }
+
+    try (Table table = CONN.getTable(tableName)) {
+      for (int i = 0; i < cellNum; i++) {
+        Put put = new Put(Bytes.toBytes(i)).addColumn(family, qualifier, System.currentTimeMillis(),
+          Bytes.toBytes(i));
+        table.put(put);
+      }
+    }
+    Waiter.waitFor(CONF, 60000, () -> TestEndpoint.getEntries().size() >= cellNum);
+
+    int index = 0;
+    Assert.assertEquals(TestEndpoint.getEntries().size(), cellNum);
+    if (!isSerial) {
+      Collections.sort(TestEndpoint.getEntries(), (a, b) -> {
+        long seqA = a.getKey().getSequenceId();
+        long seqB = b.getKey().getSequenceId();
+        return seqA == seqB ? 0 : (seqA < seqB ? -1 : 1);
+      });
+    }
+    for (Entry entry : TestEndpoint.getEntries()) {
+      Assert.assertEquals(entry.getKey().getTableName(), tableName);
+      Assert.assertEquals(entry.getEdit().getCells().size(), 1);
+      Cell cell = entry.getEdit().getCells().get(0);
+      Assert.assertArrayEquals(
+        Bytes.copy(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()),
+        Bytes.toBytes(index));
+      index++;
+    }
+    Assert.assertEquals(index, cellNum);
+  }
+
+  @Test
+  public void testSerialReplicate() throws Exception {
+    testHBaseReplicationEndpoint("testSerialReplicate", "100", true);
+  }
+
+  @Test
+  public void testParallelReplicate() throws Exception {
+    testHBaseReplicationEndpoint("testParallelReplicate", "101", false);
+  }
+
+  public static class TestEndpoint extends HBaseInterClusterReplicationEndpoint {
+
+    private final static BlockingQueue<Entry> entryQueue = new LinkedBlockingQueue<>();
+
+    public static void reset() {
+      entryQueue.clear();
+    }
+
+    public static List<Entry> getEntries() {
+      return new ArrayList<>(entryQueue);
+    }
+
+    @Override
+    public boolean canReplicateToSameCluster() {
+      return true;
+    }
+
+    @Override
+    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
+      return () -> {
+        entryQueue.addAll(entries);
+        return ordinal;
+      };
+    }
+
+    @Override
+    public synchronized List<ServerName> getRegionServers() {
+      // Return multiple server names for endpoint parallel replication.
+      return new ArrayList<>(
+          ImmutableList.of(ServerName.valueOf("www.example.com", 12016, 1525245876026L),
+            ServerName.valueOf("www.example2.com", 12016, 1525245876026L),
+            ServerName.valueOf("www.example3.com", 12016, 1525245876026L),
+            ServerName.valueOf("www.example4.com", 12016, 1525245876026L),
+            ServerName.valueOf("www.example4.com", 12016, 1525245876026L)));
+    }
+  }
+}