You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2022/06/22 11:47:19 UTC

[hbase] branch revert-4559-revert-4463-removetp created (now 1e4fcac12a7)

This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a change to branch revert-4559-revert-4463-removetp
in repository https://gitbox.apache.org/repos/asf/hbase.git


      at 1e4fcac12a7 Revert "Revert "HBASE-27062 ThreadPool is unnecessary in HBaseInterClusterReplication… (#4463)" (#4559)"

This branch includes the following new commits:

     new 1e4fcac12a7 Revert "Revert "HBASE-27062 ThreadPool is unnecessary in HBaseInterClusterReplication… (#4463)" (#4559)"

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



[hbase] 01/01: Revert "Revert "HBASE-27062 ThreadPool is unnecessary in HBaseInterClusterReplication… (#4463)" (#4559)"

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch revert-4559-revert-4463-removetp
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 1e4fcac12a7bfcc7fde4d48984bff7fbb94da7d9
Author: chenglei <ch...@apache.org>
AuthorDate: Wed Jun 22 19:47:12 2022 +0800

    Revert "Revert "HBASE-27062 ThreadPool is unnecessary in HBaseInterClusterReplication… (#4463)" (#4559)"
    
    This reverts commit 902bc3ed70a332db97d07e41f38fe4bd2686990d.
---
 .../hbase/protobuf/ReplicationProtobufUtil.java    |  11 +-
 .../HBaseInterClusterReplicationEndpoint.java      | 211 +++++++++++----------
 .../hbase/replication/SyncReplicationTestBase.java |   9 +-
 .../hbase/replication/TestReplicationEndpoint.java |  13 +-
 .../replication/regionserver/TestReplicator.java   |  35 ++--
 .../TestSerialReplicationEndpoint.java             |  11 +-
 6 files changed, 154 insertions(+), 136 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
index c2e96ead6f7..cfdf0e12c85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -28,7 +29,6 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -37,6 +37,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
@@ -52,12 +53,12 @@ public class ReplicationProtobufUtil {
    * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
    * @param sourceHFileArchiveDir  Path to the source cluster hfile archive directory
    */
-  public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
-    String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
-    int timeout) throws IOException {
+  public static CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
+    AsyncRegionServerAdmin admin, Entry[] entries, String replicationClusterId,
+    Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) {
     Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
       replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
-    FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
+    return admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index cec360a4c97..39e68bf9eb4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -29,18 +29,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
@@ -57,7 +50,7 @@ import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.ipc.RemoteException;
@@ -65,7 +58,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
+import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator;
 
 /**
  * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating
@@ -82,8 +76,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   private static final Logger LOG =
     LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
 
-  private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
-
   /** Drop edits for tables that been deleted from the replication source and target */
   public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
     "hbase.replication.drop.on.deleted.table";
@@ -97,25 +89,22 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   private int maxRetriesMultiplier;
   // Socket timeouts require even bolder actions since we don't want to DDOS
   private int socketTimeoutMultiplier;
-  // Amount of time for shutdown to wait for all tasks to complete
-  private long maxTerminationWait;
   // Size limit for replication RPCs, in bytes
   private int replicationRpcLimit;
   // Metrics for this source
   private MetricsSource metrics;
   private boolean peersSelected = false;
   private String replicationClusterId = "";
-  private ThreadPoolExecutor exec;
   private int maxThreads;
   private Path baseNamespaceDir;
   private Path hfileArchiveDir;
   private boolean replicationBulkLoadDataEnabled;
-  private Abortable abortable;
   private boolean dropOnDeletedTables;
   private boolean dropOnDeletedColumnFamilies;
   private boolean isSerial = false;
   // Initialising as 0 to guarantee at least one logging message
   private long lastSinkFetchTime = 0;
+  private volatile boolean stopping = false;
 
   @Override
   public void init(Context context) throws IOException {
@@ -124,20 +113,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
     this.socketTimeoutMultiplier =
       this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier);
-    // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
-    // tasks to terminate when doStop() is called.
-    long maxTerminationWaitMultiplier = this.conf.getLong(
-      "replication.source.maxterminationmultiplier", DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
-    this.maxTerminationWait = maxTerminationWaitMultiplier
-      * this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
     this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
     this.metrics = context.getMetrics();
     // per sink thread pool
     this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
       HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
-    this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
-      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
-    this.abortable = ctx.getAbortable();
     // Set the size limit for replication RPCs to 95% of the max request size.
     // We could do with less slop if we have an accurate estimate of encoded size. Being
     // conservative for now.
@@ -394,30 +374,31 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     return entryList;
   }
 
-  private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
-    List<List<Entry>> batches) throws IOException {
-    int futures = 0;
+  private long parallelReplicate(ReplicateContext replicateContext, List<List<Entry>> batches)
+    throws IOException {
+    List<CompletableFuture<Integer>> futures =
+      new ArrayList<CompletableFuture<Integer>>(batches.size());
     for (int i = 0; i < batches.size(); i++) {
       List<Entry> entries = batches.get(i);
-      if (!entries.isEmpty()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
-            replicateContext.getSize());
-        }
-        // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
-        pool.submit(createReplicator(entries, i, replicateContext.getTimeout()));
-        futures++;
+      if (entries.isEmpty()) {
+        continue;
       }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
+          replicateContext.getSize());
+      }
+      // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
+      futures.add(asyncReplicate(entries, i, replicateContext.getTimeout()));
     }
 
     IOException iox = null;
     long lastWriteTime = 0;
-    for (int i = 0; i < futures; i++) {
+
+    for (CompletableFuture<Integer> f : futures) {
       try {
         // wait for all futures, remove successful parts
         // (only the remaining parts will be retried)
-        Future<Integer> f = pool.take();
-        int index = f.get();
+        int index = FutureUtils.get(f);
         List<Entry> batch = batches.get(index);
         batches.set(index, Collections.emptyList()); // remove successful batch
         // Find the most recent write time in the batch
@@ -425,12 +406,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         if (writeTime > lastWriteTime) {
           lastWriteTime = writeTime;
         }
-      } catch (InterruptedException ie) {
-        iox = new IOException(ie);
-      } catch (ExecutionException ee) {
-        iox = ee.getCause() instanceof IOException
-          ? (IOException) ee.getCause()
-          : new IOException(ee.getCause());
+      } catch (IOException e) {
+        iox = e;
+      } catch (RuntimeException e) {
+        iox = new IOException(e);
       }
     }
     if (iox != null) {
@@ -445,7 +424,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
    */
   @Override
   public boolean replicate(ReplicateContext replicateContext) {
-    CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
     int sleepMultiplier = 1;
 
     if (!peersSelected && this.isRunning()) {
@@ -468,7 +446,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     }
 
     List<List<Entry>> batches = createBatches(replicateContext.getEntries());
-    while (this.isRunning() && !exec.isShutdown()) {
+    while (this.isRunning() && !this.stopping) {
       if (!isPeerEnabled()) {
         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
           sleepMultiplier++;
@@ -477,7 +455,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
       }
       try {
         // replicate the batches to sink side.
-        parallelReplicate(pool, replicateContext, batches);
+        parallelReplicate(replicateContext, batches);
         return true;
       } catch (IOException ioe) {
         if (ioe instanceof RemoteException) {
@@ -532,82 +510,117 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
 
   @Override
   protected void doStop() {
-    disconnect(); // don't call super.doStop()
     // Allow currently running replication tasks to finish
-    exec.shutdown();
-    try {
-      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-    }
-    // Abort if the tasks did not terminate in time
-    if (!exec.isTerminated()) {
-      String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The "
-        + "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. "
-        + "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
-      abortable.abort(errMsg, new IOException(errMsg));
-    }
+    this.stopping = true;
+    disconnect(); // don't call super.doStop()
     notifyStopped();
   }
 
-  protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout)
-    throws IOException {
+  protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int batchIndex,
+    int timeout) {
+    int entriesHashCode = System.identityHashCode(entries);
+    if (LOG.isTraceEnabled()) {
+      long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
+      LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", logPeerId(),
+        entriesHashCode, entries.size(), size, replicationClusterId);
+    }
     SinkPeer sinkPeer = null;
+    final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
     try {
-      int entriesHashCode = System.identityHashCode(entries);
-      if (LOG.isTraceEnabled()) {
-        long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
-        LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
-          logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
-      }
       sinkPeer = getReplicationSink();
-      AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
-      try {
-        ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
-          entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
-          hfileArchiveDir, timeout);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
-        }
-      } catch (IOException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
+    } catch (IOException e) {
+      this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer);
+      resultCompletableFuture.completeExceptionally(e);
+      return resultCompletableFuture;
+    }
+    assert sinkPeer != null;
+    AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
+    final SinkPeer sinkPeerToUse = sinkPeer;
+    FutureUtils.addListener(
+      ReplicationProtobufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]),
+        replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout),
+      (response, exception) -> {
+        if (exception != null) {
+          onReplicateWALEntryException(entriesHashCode, exception, sinkPeerToUse);
+          resultCompletableFuture.completeExceptionally(exception);
+          return;
         }
-        throw e;
-      }
-      reportSinkSuccess(sinkPeer);
-    } catch (IOException ioe) {
+        reportSinkSuccess(sinkPeerToUse);
+        resultCompletableFuture.complete(batchIndex);
+      });
+    return resultCompletableFuture;
+  }
+
+  private void onReplicateWALEntryException(int entriesHashCode, Throwable exception,
+    final SinkPeer sinkPeer) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, exception);
+    }
+    if (exception instanceof IOException) {
       if (sinkPeer != null) {
         reportBadSink(sinkPeer);
       }
-      throw ioe;
     }
-    return batchIndex;
   }
 
-  private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout)
-    throws IOException {
-    int batchSize = 0, index = 0;
+  /**
+   * Here for {@link HBaseInterClusterReplicationEndpoint#isSerialis} is true, we iterator over the
+   * WAL {@link Entry} list, once we reached a batch limit, we send it out, and in the callback, we
+   * send the next batch, until we send all entries out.
+   */
+  private CompletableFuture<Integer> serialReplicateRegionEntries(
+    PeekingIterator<Entry> walEntryPeekingIterator, int batchIndex, int timeout) {
+    if (!walEntryPeekingIterator.hasNext()) {
+      return CompletableFuture.completedFuture(batchIndex);
+    }
+    int batchSize = 0;
     List<Entry> batch = new ArrayList<>();
-    for (Entry entry : entries) {
+    while (walEntryPeekingIterator.hasNext()) {
+      Entry entry = walEntryPeekingIterator.peek();
       int entrySize = getEstimatedEntrySize(entry);
       if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
-        replicateEntries(batch, index++, timeout);
-        batch.clear();
-        batchSize = 0;
+        break;
       }
+      walEntryPeekingIterator.next();
       batch.add(entry);
       batchSize += entrySize;
     }
-    if (batchSize > 0) {
-      replicateEntries(batch, index, timeout);
+
+    if (batchSize <= 0) {
+      return CompletableFuture.completedFuture(batchIndex);
     }
-    return batchIndex;
+    final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
+    FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout), (response, exception) -> {
+      if (exception != null) {
+        resultCompletableFuture.completeExceptionally(exception);
+        return;
+      }
+      if (!walEntryPeekingIterator.hasNext()) {
+        resultCompletableFuture.complete(batchIndex);
+        return;
+      }
+      FutureUtils.addListener(
+        serialReplicateRegionEntries(walEntryPeekingIterator, batchIndex, timeout),
+        (currentResponse, currentException) -> {
+          if (currentException != null) {
+            resultCompletableFuture.completeExceptionally(currentException);
+            return;
+          }
+          resultCompletableFuture.complete(batchIndex);
+        });
+    });
+    return resultCompletableFuture;
   }
 
-  protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) {
+  /**
+   * Replicate entries to peer cluster by async API.
+   */
+  protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int batchIndex,
+    int timeout) {
     return isSerial
-      ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout)
-      : () -> replicateEntries(entries, batchIndex, timeout);
+      ? serialReplicateRegionEntries(Iterators.peekingIterator(entries.iterator()), batchIndex,
+        timeout)
+      : replicateEntries(entries, batchIndex, timeout);
   }
 
   private String logPeerId() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index 011f0a19f0c..e82d69826d8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
@@ -267,14 +268,14 @@ public class SyncReplicationTestBase {
         new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
     }
     if (!expectedRejection) {
-      ReplicationProtobufUtil.replicateWALEntry(
+      FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
         connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
-        HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
+        HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
     } else {
       try {
-        ReplicationProtobufUtil.replicateWALEntry(
+        FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
           connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
-          HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
+          HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
         fail("Should throw IOException when sync-replication state is in A or DA");
       } catch (RemoteException e) {
         assertRejection(e.unwrapRemoteException());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 53512ec2af8..9bc632e223b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -30,7 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -556,15 +556,16 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     }
 
     @Override
-    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
+    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
+      int timeout) {
       // Fail only once, we don't want to slow down the test.
       if (failedOnce) {
-        return () -> ordinal;
+        return CompletableFuture.completedFuture(ordinal);
       } else {
         failedOnce = true;
-        return () -> {
-          throw new IOException("Sample Exception: Failed to replicate.");
-        };
+        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
+        future.completeExceptionally(new IOException("Sample Exception: Failed to replicate."));
+        return future;
       }
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
index 803c4278f97..c48755fb5f0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -228,15 +228,15 @@ public class TestReplicator extends TestReplicationBase {
     }
 
     @Override
-    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
-      return () -> {
-        int batchIndex = replicateEntries(entries, ordinal, timeout);
+    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
+      int timeout) {
+      return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
         entriesCount += entries.size();
         int count = batchCount.incrementAndGet();
         LOG.info(
           "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
-        return batchIndex;
-      };
+      });
+
     }
   }
 
@@ -245,20 +245,23 @@ public class TestReplicator extends TestReplicationBase {
     private final AtomicBoolean failNext = new AtomicBoolean(false);
 
     @Override
-    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
-      return () -> {
-        if (failNext.compareAndSet(false, true)) {
-          int batchIndex = replicateEntries(entries, ordinal, timeout);
+    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
+      int timeout) {
+
+      if (failNext.compareAndSet(false, true)) {
+        return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
           entriesCount += entries.size();
           int count = batchCount.incrementAndGet();
           LOG.info(
             "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
-          return batchIndex;
-        } else if (failNext.compareAndSet(true, false)) {
-          throw new ServiceException("Injected failure");
-        }
-        return ordinal;
-      };
+        });
+      } else if (failNext.compareAndSet(true, false)) {
+        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
+        future.completeExceptionally(new ServiceException("Injected failure"));
+        return future;
+      }
+      return CompletableFuture.completedFuture(ordinal);
+
     }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
index c0eace0bbda..5f99b88e0a4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -165,11 +165,10 @@ public class TestSerialReplicationEndpoint {
     }
 
     @Override
-    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
-      return () -> {
-        entryQueue.addAll(entries);
-        return ordinal;
-      };
+    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
+      int timeout) {
+      entryQueue.addAll(entries);
+      return CompletableFuture.completedFuture(ordinal);
     }
 
     @Override