You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2022/06/22 12:02:01 UTC

[hbase] branch master updated: HBASE-27062 Remove ThreadPool in HBaseInterClusterReplicationEndpoint when use AsyncClusterConnection (#4560)

This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new de804938b98 HBASE-27062 Remove ThreadPool in HBaseInterClusterReplicationEndpoint when use AsyncClusterConnection (#4560)
de804938b98 is described below

commit de804938b98b232dc5a5e27953c7d60824c06422
Author: chenglei <ch...@apache.org>
AuthorDate: Wed Jun 22 20:01:56 2022 +0800

    HBASE-27062 Remove ThreadPool in HBaseInterClusterReplicationEndpoint when use AsyncClusterConnection (#4560)
---
 .../hbase/protobuf/ReplicationProtobufUtil.java    |  11 +-
 .../HBaseInterClusterReplicationEndpoint.java      | 211 +++++++++++----------
 .../hbase/replication/SyncReplicationTestBase.java |   9 +-
 .../hbase/replication/TestReplicationEndpoint.java |  13 +-
 .../replication/regionserver/TestReplicator.java   |  35 ++--
 .../TestSerialReplicationEndpoint.java             |  11 +-
 6 files changed, 154 insertions(+), 136 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
index c2e96ead6f7..cfdf0e12c85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellScanner;
@@ -28,7 +29,6 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
 import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
 import org.apache.hadoop.hbase.io.SizedCellScanner;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
@@ -37,6 +37,7 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
 
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
 
@@ -52,12 +53,12 @@ public class ReplicationProtobufUtil {
    * @param sourceBaseNamespaceDir Path to source cluster base namespace directory
    * @param sourceHFileArchiveDir  Path to the source cluster hfile archive directory
    */
-  public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
-    String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
-    int timeout) throws IOException {
+  public static CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
+    AsyncRegionServerAdmin admin, Entry[] entries, String replicationClusterId,
+    Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) {
     Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
       replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
-    FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
+    return admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout);
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index cec360a4c97..39e68bf9eb4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -29,18 +29,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
@@ -57,7 +50,7 @@ import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.ipc.RemoteException;
@@ -65,7 +58,8 @@ import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
+import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator;
 
 /**
  * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating
@@ -82,8 +76,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   private static final Logger LOG =
     LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
 
-  private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
-
   /** Drop edits for tables that been deleted from the replication source and target */
   public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
     "hbase.replication.drop.on.deleted.table";
@@ -97,25 +89,22 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   private int maxRetriesMultiplier;
   // Socket timeouts require even bolder actions since we don't want to DDOS
   private int socketTimeoutMultiplier;
-  // Amount of time for shutdown to wait for all tasks to complete
-  private long maxTerminationWait;
   // Size limit for replication RPCs, in bytes
   private int replicationRpcLimit;
   // Metrics for this source
   private MetricsSource metrics;
   private boolean peersSelected = false;
   private String replicationClusterId = "";
-  private ThreadPoolExecutor exec;
   private int maxThreads;
   private Path baseNamespaceDir;
   private Path hfileArchiveDir;
   private boolean replicationBulkLoadDataEnabled;
-  private Abortable abortable;
   private boolean dropOnDeletedTables;
   private boolean dropOnDeletedColumnFamilies;
   private boolean isSerial = false;
   // Initialising as 0 to guarantee at least one logging message
   private long lastSinkFetchTime = 0;
+  private volatile boolean stopping = false;
 
   @Override
   public void init(Context context) throws IOException {
@@ -124,20 +113,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
     this.socketTimeoutMultiplier =
       this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier);
-    // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
-    // tasks to terminate when doStop() is called.
-    long maxTerminationWaitMultiplier = this.conf.getLong(
-      "replication.source.maxterminationmultiplier", DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
-    this.maxTerminationWait = maxTerminationWaitMultiplier
-      * this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
     this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
     this.metrics = context.getMetrics();
     // per sink thread pool
     this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
       HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
-    this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
-      new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
-    this.abortable = ctx.getAbortable();
     // Set the size limit for replication RPCs to 95% of the max request size.
     // We could do with less slop if we have an accurate estimate of encoded size. Being
     // conservative for now.
@@ -394,30 +374,31 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     return entryList;
   }
 
-  private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
-    List<List<Entry>> batches) throws IOException {
-    int futures = 0;
+  private long parallelReplicate(ReplicateContext replicateContext, List<List<Entry>> batches)
+    throws IOException {
+    List<CompletableFuture<Integer>> futures =
+      new ArrayList<CompletableFuture<Integer>>(batches.size());
     for (int i = 0; i < batches.size(); i++) {
       List<Entry> entries = batches.get(i);
-      if (!entries.isEmpty()) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
-            replicateContext.getSize());
-        }
-        // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
-        pool.submit(createReplicator(entries, i, replicateContext.getTimeout()));
-        futures++;
+      if (entries.isEmpty()) {
+        continue;
       }
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
+          replicateContext.getSize());
+      }
+      // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
+      futures.add(asyncReplicate(entries, i, replicateContext.getTimeout()));
     }
 
     IOException iox = null;
     long lastWriteTime = 0;
-    for (int i = 0; i < futures; i++) {
+
+    for (CompletableFuture<Integer> f : futures) {
       try {
         // wait for all futures, remove successful parts
         // (only the remaining parts will be retried)
-        Future<Integer> f = pool.take();
-        int index = f.get();
+        int index = FutureUtils.get(f);
         List<Entry> batch = batches.get(index);
         batches.set(index, Collections.emptyList()); // remove successful batch
         // Find the most recent write time in the batch
@@ -425,12 +406,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         if (writeTime > lastWriteTime) {
           lastWriteTime = writeTime;
         }
-      } catch (InterruptedException ie) {
-        iox = new IOException(ie);
-      } catch (ExecutionException ee) {
-        iox = ee.getCause() instanceof IOException
-          ? (IOException) ee.getCause()
-          : new IOException(ee.getCause());
+      } catch (IOException e) {
+        iox = e;
+      } catch (RuntimeException e) {
+        iox = new IOException(e);
       }
     }
     if (iox != null) {
@@ -445,7 +424,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
    */
   @Override
   public boolean replicate(ReplicateContext replicateContext) {
-    CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
     int sleepMultiplier = 1;
 
     if (!peersSelected && this.isRunning()) {
@@ -468,7 +446,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     }
 
     List<List<Entry>> batches = createBatches(replicateContext.getEntries());
-    while (this.isRunning() && !exec.isShutdown()) {
+    while (this.isRunning() && !this.stopping) {
       if (!isPeerEnabled()) {
         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
           sleepMultiplier++;
@@ -477,7 +455,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
       }
       try {
         // replicate the batches to sink side.
-        parallelReplicate(pool, replicateContext, batches);
+        parallelReplicate(replicateContext, batches);
         return true;
       } catch (IOException ioe) {
         if (ioe instanceof RemoteException) {
@@ -532,82 +510,117 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
 
   @Override
   protected void doStop() {
-    disconnect(); // don't call super.doStop()
     // Allow currently running replication tasks to finish
-    exec.shutdown();
-    try {
-      exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
-    } catch (InterruptedException e) {
-    }
-    // Abort if the tasks did not terminate in time
-    if (!exec.isTerminated()) {
-      String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The "
-        + "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. "
-        + "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
-      abortable.abort(errMsg, new IOException(errMsg));
-    }
+    this.stopping = true;
+    disconnect(); // don't call super.doStop()
     notifyStopped();
   }
 
-  protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout)
-    throws IOException {
+  protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int batchIndex,
+    int timeout) {
+    int entriesHashCode = System.identityHashCode(entries);
+    if (LOG.isTraceEnabled()) {
+      long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
+      LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", logPeerId(),
+        entriesHashCode, entries.size(), size, replicationClusterId);
+    }
     SinkPeer sinkPeer = null;
+    final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
     try {
-      int entriesHashCode = System.identityHashCode(entries);
-      if (LOG.isTraceEnabled()) {
-        long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
-        LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
-          logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
-      }
       sinkPeer = getReplicationSink();
-      AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
-      try {
-        ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
-          entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
-          hfileArchiveDir, timeout);
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
-        }
-      } catch (IOException e) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
+    } catch (IOException e) {
+      this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer);
+      resultCompletableFuture.completeExceptionally(e);
+      return resultCompletableFuture;
+    }
+    assert sinkPeer != null;
+    AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
+    final SinkPeer sinkPeerToUse = sinkPeer;
+    FutureUtils.addListener(
+      ReplicationProtobufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]),
+        replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout),
+      (response, exception) -> {
+        if (exception != null) {
+          onReplicateWALEntryException(entriesHashCode, exception, sinkPeerToUse);
+          resultCompletableFuture.completeExceptionally(exception);
+          return;
         }
-        throw e;
-      }
-      reportSinkSuccess(sinkPeer);
-    } catch (IOException ioe) {
+        reportSinkSuccess(sinkPeerToUse);
+        resultCompletableFuture.complete(batchIndex);
+      });
+    return resultCompletableFuture;
+  }
+
+  private void onReplicateWALEntryException(int entriesHashCode, Throwable exception,
+    final SinkPeer sinkPeer) {
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, exception);
+    }
+    if (exception instanceof IOException) {
       if (sinkPeer != null) {
         reportBadSink(sinkPeer);
       }
-      throw ioe;
     }
-    return batchIndex;
   }
 
-  private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout)
-    throws IOException {
-    int batchSize = 0, index = 0;
+  /**
+   * Here for {@link HBaseInterClusterReplicationEndpoint#isSerialis} is true, we iterator over the
+   * WAL {@link Entry} list, once we reached a batch limit, we send it out, and in the callback, we
+   * send the next batch, until we send all entries out.
+   */
+  private CompletableFuture<Integer> serialReplicateRegionEntries(
+    PeekingIterator<Entry> walEntryPeekingIterator, int batchIndex, int timeout) {
+    if (!walEntryPeekingIterator.hasNext()) {
+      return CompletableFuture.completedFuture(batchIndex);
+    }
+    int batchSize = 0;
     List<Entry> batch = new ArrayList<>();
-    for (Entry entry : entries) {
+    while (walEntryPeekingIterator.hasNext()) {
+      Entry entry = walEntryPeekingIterator.peek();
       int entrySize = getEstimatedEntrySize(entry);
       if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
-        replicateEntries(batch, index++, timeout);
-        batch.clear();
-        batchSize = 0;
+        break;
       }
+      walEntryPeekingIterator.next();
       batch.add(entry);
       batchSize += entrySize;
     }
-    if (batchSize > 0) {
-      replicateEntries(batch, index, timeout);
+
+    if (batchSize <= 0) {
+      return CompletableFuture.completedFuture(batchIndex);
     }
-    return batchIndex;
+    final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
+    FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout), (response, exception) -> {
+      if (exception != null) {
+        resultCompletableFuture.completeExceptionally(exception);
+        return;
+      }
+      if (!walEntryPeekingIterator.hasNext()) {
+        resultCompletableFuture.complete(batchIndex);
+        return;
+      }
+      FutureUtils.addListener(
+        serialReplicateRegionEntries(walEntryPeekingIterator, batchIndex, timeout),
+        (currentResponse, currentException) -> {
+          if (currentException != null) {
+            resultCompletableFuture.completeExceptionally(currentException);
+            return;
+          }
+          resultCompletableFuture.complete(batchIndex);
+        });
+    });
+    return resultCompletableFuture;
   }
 
-  protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) {
+  /**
+   * Replicate entries to peer cluster by async API.
+   */
+  protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int batchIndex,
+    int timeout) {
     return isSerial
-      ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout)
-      : () -> replicateEntries(entries, batchIndex, timeout);
+      ? serialReplicateRegionEntries(Iterators.peekingIterator(entries.iterator()), batchIndex,
+        timeout)
+      : replicateEntries(entries, batchIndex, timeout);
   }
 
   private String logPeerId() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index 011f0a19f0c..e82d69826d8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
@@ -267,14 +268,14 @@ public class SyncReplicationTestBase {
         new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
     }
     if (!expectedRejection) {
-      ReplicationProtobufUtil.replicateWALEntry(
+      FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
         connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
-        HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
+        HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
     } else {
       try {
-        ReplicationProtobufUtil.replicateWALEntry(
+        FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
           connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
-          HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
+          HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
         fail("Should throw IOException when sync-replication state is in A or DA");
       } catch (RemoteException e) {
         assertRejection(e.unwrapRemoteException());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 53512ec2af8..9bc632e223b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -30,7 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.UUID;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
@@ -556,15 +556,16 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     }
 
     @Override
-    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
+    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
+      int timeout) {
       // Fail only once, we don't want to slow down the test.
       if (failedOnce) {
-        return () -> ordinal;
+        return CompletableFuture.completedFuture(ordinal);
       } else {
         failedOnce = true;
-        return () -> {
-          throw new IOException("Sample Exception: Failed to replicate.");
-        };
+        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
+        future.completeExceptionally(new IOException("Sample Exception: Failed to replicate."));
+        return future;
       }
     }
   }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
index 803c4278f97..c48755fb5f0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -228,15 +228,15 @@ public class TestReplicator extends TestReplicationBase {
     }
 
     @Override
-    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
-      return () -> {
-        int batchIndex = replicateEntries(entries, ordinal, timeout);
+    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
+      int timeout) {
+      return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
         entriesCount += entries.size();
         int count = batchCount.incrementAndGet();
         LOG.info(
           "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
-        return batchIndex;
-      };
+      });
+
     }
   }
 
@@ -245,20 +245,23 @@ public class TestReplicator extends TestReplicationBase {
     private final AtomicBoolean failNext = new AtomicBoolean(false);
 
     @Override
-    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
-      return () -> {
-        if (failNext.compareAndSet(false, true)) {
-          int batchIndex = replicateEntries(entries, ordinal, timeout);
+    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
+      int timeout) {
+
+      if (failNext.compareAndSet(false, true)) {
+        return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
           entriesCount += entries.size();
           int count = batchCount.incrementAndGet();
           LOG.info(
             "Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
-          return batchIndex;
-        } else if (failNext.compareAndSet(true, false)) {
-          throw new ServiceException("Injected failure");
-        }
-        return ordinal;
-      };
+        });
+      } else if (failNext.compareAndSet(true, false)) {
+        CompletableFuture<Integer> future = new CompletableFuture<Integer>();
+        future.completeExceptionally(new ServiceException("Injected failure"));
+        return future;
+      }
+      return CompletableFuture.completedFuture(ordinal);
+
     }
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
index c0eace0bbda..5f99b88e0a4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.LinkedBlockingQueue;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
@@ -165,11 +165,10 @@ public class TestSerialReplicationEndpoint {
     }
 
     @Override
-    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
-      return () -> {
-        entryQueue.addAll(entries);
-        return ordinal;
-      };
+    protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
+      int timeout) {
+      entryQueue.addAll(entries);
+      return CompletableFuture.completedFuture(ordinal);
     }
 
     @Override