You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2022/06/22 12:02:01 UTC
[hbase] branch master updated: HBASE-27062 Remove ThreadPool in HBaseInterClusterReplicationEndpoint when use AsyncClusterConnection (#4560)
This is an automated email from the ASF dual-hosted git repository.
chenglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new de804938b98 HBASE-27062 Remove ThreadPool in HBaseInterClusterReplicationEndpoint when use AsyncClusterConnection (#4560)
de804938b98 is described below
commit de804938b98b232dc5a5e27953c7d60824c06422
Author: chenglei <ch...@apache.org>
AuthorDate: Wed Jun 22 20:01:56 2022 +0800
HBASE-27062 Remove ThreadPool in HBaseInterClusterReplicationEndpoint when use AsyncClusterConnection (#4560)
---
.../hbase/protobuf/ReplicationProtobufUtil.java | 11 +-
.../HBaseInterClusterReplicationEndpoint.java | 211 +++++++++++----------
.../hbase/replication/SyncReplicationTestBase.java | 9 +-
.../hbase/replication/TestReplicationEndpoint.java | 13 +-
.../replication/regionserver/TestReplicator.java | 35 ++--
.../TestSerialReplicationEndpoint.java | 11 +-
6 files changed, 154 insertions(+), 136 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
index c2e96ead6f7..cfdf0e12c85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtobufUtil.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@@ -28,7 +29,6 @@ import org.apache.hadoop.hbase.PrivateCellUtil;
import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
import org.apache.hadoop.hbase.io.SizedCellScanner;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
-import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
@@ -37,6 +37,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@@ -52,12 +53,12 @@ public class ReplicationProtobufUtil {
* @param sourceBaseNamespaceDir Path to source cluster base namespace directory
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
*/
- public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
- String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir,
- int timeout) throws IOException {
+ public static CompletableFuture<ReplicateWALEntryResponse> replicateWALEntry(
+ AsyncRegionServerAdmin admin, Entry[] entries, String replicationClusterId,
+ Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir, int timeout) {
Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
- FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout));
+ return admin.replicateWALEntry(p.getFirst(), p.getSecond(), timeout);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index cec360a4c97..39e68bf9eb4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -29,18 +29,11 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@@ -57,7 +50,7 @@ import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.ipc.RemoteException;
@@ -65,7 +58,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hbase.thirdparty.com.google.common.collect.Iterators;
+import org.apache.hbase.thirdparty.com.google.common.collect.PeekingIterator;
/**
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} implementation for replicating
@@ -82,8 +76,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private static final Logger LOG =
LoggerFactory.getLogger(HBaseInterClusterReplicationEndpoint.class);
- private static final long DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER = 2;
-
/** Drop edits for tables that been deleted from the replication source and target */
public static final String REPLICATION_DROP_ON_DELETED_TABLE_KEY =
"hbase.replication.drop.on.deleted.table";
@@ -97,25 +89,22 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private int maxRetriesMultiplier;
// Socket timeouts require even bolder actions since we don't want to DDOS
private int socketTimeoutMultiplier;
- // Amount of time for shutdown to wait for all tasks to complete
- private long maxTerminationWait;
// Size limit for replication RPCs, in bytes
private int replicationRpcLimit;
// Metrics for this source
private MetricsSource metrics;
private boolean peersSelected = false;
private String replicationClusterId = "";
- private ThreadPoolExecutor exec;
private int maxThreads;
private Path baseNamespaceDir;
private Path hfileArchiveDir;
private boolean replicationBulkLoadDataEnabled;
- private Abortable abortable;
private boolean dropOnDeletedTables;
private boolean dropOnDeletedColumnFamilies;
private boolean isSerial = false;
// Initialising as 0 to guarantee at least one logging message
private long lastSinkFetchTime = 0;
+ private volatile boolean stopping = false;
@Override
public void init(Context context) throws IOException {
@@ -124,20 +113,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.maxRetriesMultiplier = this.conf.getInt("replication.source.maxretriesmultiplier", 300);
this.socketTimeoutMultiplier =
this.conf.getInt("replication.source.socketTimeoutMultiplier", maxRetriesMultiplier);
- // A Replicator job is bound by the RPC timeout. We will wait this long for all Replicator
- // tasks to terminate when doStop() is called.
- long maxTerminationWaitMultiplier = this.conf.getLong(
- "replication.source.maxterminationmultiplier", DEFAULT_MAX_TERMINATION_WAIT_MULTIPLIER);
- this.maxTerminationWait = maxTerminationWaitMultiplier
- * this.conf.getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
this.sleepForRetries = this.conf.getLong("replication.source.sleepforretries", 1000);
this.metrics = context.getMetrics();
// per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
- this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
- new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
- this.abortable = ctx.getAbortable();
// Set the size limit for replication RPCs to 95% of the max request size.
// We could do with less slop if we have an accurate estimate of encoded size. Being
// conservative for now.
@@ -394,30 +374,31 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
return entryList;
}
- private long parallelReplicate(CompletionService<Integer> pool, ReplicateContext replicateContext,
- List<List<Entry>> batches) throws IOException {
- int futures = 0;
+ private long parallelReplicate(ReplicateContext replicateContext, List<List<Entry>> batches)
+ throws IOException {
+ List<CompletableFuture<Integer>> futures =
+ new ArrayList<CompletableFuture<Integer>>(batches.size());
for (int i = 0; i < batches.size(); i++) {
List<Entry> entries = batches.get(i);
- if (!entries.isEmpty()) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
- replicateContext.getSize());
- }
- // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
- pool.submit(createReplicator(entries, i, replicateContext.getTimeout()));
- futures++;
+ if (entries.isEmpty()) {
+ continue;
}
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} Submitting {} entries of total size {}", logPeerId(), entries.size(),
+ replicateContext.getSize());
+ }
+ // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
+ futures.add(asyncReplicate(entries, i, replicateContext.getTimeout()));
}
IOException iox = null;
long lastWriteTime = 0;
- for (int i = 0; i < futures; i++) {
+
+ for (CompletableFuture<Integer> f : futures) {
try {
// wait for all futures, remove successful parts
// (only the remaining parts will be retried)
- Future<Integer> f = pool.take();
- int index = f.get();
+ int index = FutureUtils.get(f);
List<Entry> batch = batches.get(index);
batches.set(index, Collections.emptyList()); // remove successful batch
// Find the most recent write time in the batch
@@ -425,12 +406,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
if (writeTime > lastWriteTime) {
lastWriteTime = writeTime;
}
- } catch (InterruptedException ie) {
- iox = new IOException(ie);
- } catch (ExecutionException ee) {
- iox = ee.getCause() instanceof IOException
- ? (IOException) ee.getCause()
- : new IOException(ee.getCause());
+ } catch (IOException e) {
+ iox = e;
+ } catch (RuntimeException e) {
+ iox = new IOException(e);
}
}
if (iox != null) {
@@ -445,7 +424,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
*/
@Override
public boolean replicate(ReplicateContext replicateContext) {
- CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
int sleepMultiplier = 1;
if (!peersSelected && this.isRunning()) {
@@ -468,7 +446,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
}
List<List<Entry>> batches = createBatches(replicateContext.getEntries());
- while (this.isRunning() && !exec.isShutdown()) {
+ while (this.isRunning() && !this.stopping) {
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
@@ -477,7 +455,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
}
try {
// replicate the batches to sink side.
- parallelReplicate(pool, replicateContext, batches);
+ parallelReplicate(replicateContext, batches);
return true;
} catch (IOException ioe) {
if (ioe instanceof RemoteException) {
@@ -532,82 +510,117 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
@Override
protected void doStop() {
- disconnect(); // don't call super.doStop()
// Allow currently running replication tasks to finish
- exec.shutdown();
- try {
- exec.awaitTermination(maxTerminationWait, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- }
- // Abort if the tasks did not terminate in time
- if (!exec.isTerminated()) {
- String errMsg = "HBaseInterClusterReplicationEndpoint termination failed. The "
- + "ThreadPoolExecutor failed to finish all tasks within " + maxTerminationWait + "ms. "
- + "Aborting to prevent Replication from deadlocking. See HBASE-16081.";
- abortable.abort(errMsg, new IOException(errMsg));
- }
+ this.stopping = true;
+ disconnect(); // don't call super.doStop()
notifyStopped();
}
- protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout)
- throws IOException {
+ protected CompletableFuture<Integer> replicateEntries(List<Entry> entries, int batchIndex,
+ int timeout) {
+ int entriesHashCode = System.identityHashCode(entries);
+ if (LOG.isTraceEnabled()) {
+ long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
+ LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}", logPeerId(),
+ entriesHashCode, entries.size(), size, replicationClusterId);
+ }
SinkPeer sinkPeer = null;
+ final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
try {
- int entriesHashCode = System.identityHashCode(entries);
- if (LOG.isTraceEnabled()) {
- long size = entries.stream().mapToLong(this::getEstimatedEntrySize).sum();
- LOG.trace("{} Replicating batch {} of {} entries with total size {} bytes to {}",
- logPeerId(), entriesHashCode, entries.size(), size, replicationClusterId);
- }
sinkPeer = getReplicationSink();
- AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
- try {
- ReplicationProtobufUtil.replicateWALEntry(rsAdmin,
- entries.toArray(new Entry[entries.size()]), replicationClusterId, baseNamespaceDir,
- hfileArchiveDir, timeout);
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
- }
- } catch (IOException e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, e);
+ } catch (IOException e) {
+ this.onReplicateWALEntryException(entriesHashCode, e, sinkPeer);
+ resultCompletableFuture.completeExceptionally(e);
+ return resultCompletableFuture;
+ }
+ assert sinkPeer != null;
+ AsyncRegionServerAdmin rsAdmin = sinkPeer.getRegionServer();
+ final SinkPeer sinkPeerToUse = sinkPeer;
+ FutureUtils.addListener(
+ ReplicationProtobufUtil.replicateWALEntry(rsAdmin, entries.toArray(new Entry[entries.size()]),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout),
+ (response, exception) -> {
+ if (exception != null) {
+ onReplicateWALEntryException(entriesHashCode, exception, sinkPeerToUse);
+ resultCompletableFuture.completeExceptionally(exception);
+ return;
}
- throw e;
- }
- reportSinkSuccess(sinkPeer);
- } catch (IOException ioe) {
+ reportSinkSuccess(sinkPeerToUse);
+ resultCompletableFuture.complete(batchIndex);
+ });
+ return resultCompletableFuture;
+ }
+
+ private void onReplicateWALEntryException(int entriesHashCode, Throwable exception,
+ final SinkPeer sinkPeer) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("{} Failed replicating batch {}", logPeerId(), entriesHashCode, exception);
+ }
+ if (exception instanceof IOException) {
if (sinkPeer != null) {
reportBadSink(sinkPeer);
}
- throw ioe;
}
- return batchIndex;
}
- private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout)
- throws IOException {
- int batchSize = 0, index = 0;
+ /**
+ * Here for {@link HBaseInterClusterReplicationEndpoint#isSerialis} is true, we iterator over the
+ * WAL {@link Entry} list, once we reached a batch limit, we send it out, and in the callback, we
+ * send the next batch, until we send all entries out.
+ */
+ private CompletableFuture<Integer> serialReplicateRegionEntries(
+ PeekingIterator<Entry> walEntryPeekingIterator, int batchIndex, int timeout) {
+ if (!walEntryPeekingIterator.hasNext()) {
+ return CompletableFuture.completedFuture(batchIndex);
+ }
+ int batchSize = 0;
List<Entry> batch = new ArrayList<>();
- for (Entry entry : entries) {
+ while (walEntryPeekingIterator.hasNext()) {
+ Entry entry = walEntryPeekingIterator.peek();
int entrySize = getEstimatedEntrySize(entry);
if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
- replicateEntries(batch, index++, timeout);
- batch.clear();
- batchSize = 0;
+ break;
}
+ walEntryPeekingIterator.next();
batch.add(entry);
batchSize += entrySize;
}
- if (batchSize > 0) {
- replicateEntries(batch, index, timeout);
+
+ if (batchSize <= 0) {
+ return CompletableFuture.completedFuture(batchIndex);
}
- return batchIndex;
+ final CompletableFuture<Integer> resultCompletableFuture = new CompletableFuture<Integer>();
+ FutureUtils.addListener(replicateEntries(batch, batchIndex, timeout), (response, exception) -> {
+ if (exception != null) {
+ resultCompletableFuture.completeExceptionally(exception);
+ return;
+ }
+ if (!walEntryPeekingIterator.hasNext()) {
+ resultCompletableFuture.complete(batchIndex);
+ return;
+ }
+ FutureUtils.addListener(
+ serialReplicateRegionEntries(walEntryPeekingIterator, batchIndex, timeout),
+ (currentResponse, currentException) -> {
+ if (currentException != null) {
+ resultCompletableFuture.completeExceptionally(currentException);
+ return;
+ }
+ resultCompletableFuture.complete(batchIndex);
+ });
+ });
+ return resultCompletableFuture;
}
- protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) {
+ /**
+ * Replicate entries to peer cluster by async API.
+ */
+ protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int batchIndex,
+ int timeout) {
return isSerial
- ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout)
- : () -> replicateEntries(entries, batchIndex, timeout);
+ ? serialReplicateRegionEntries(Iterators.peekingIterator(entries.iterator()), batchIndex,
+ timeout)
+ : replicateEntries(entries, batchIndex, timeout);
}
private String logPeerId() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index 011f0a19f0c..e82d69826d8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
@@ -267,14 +268,14 @@ public class SyncReplicationTestBase {
new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
}
if (!expectedRejection) {
- ReplicationProtobufUtil.replicateWALEntry(
+ FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
- HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
+ HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
} else {
try {
- ReplicationProtobufUtil.replicateWALEntry(
+ FutureUtils.get(ReplicationProtobufUtil.replicateWALEntry(
connection.getRegionServerAdmin(regionServer.getServerName()), entries, null, null, null,
- HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
+ HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT));
fail("Should throw IOException when sync-replication state is in A or DA");
} catch (RemoteException e) {
assertRejection(e.unwrapRemoteException());
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index 53512ec2af8..9bc632e223b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -30,7 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.UUID;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@@ -556,15 +556,16 @@ public class TestReplicationEndpoint extends TestReplicationBase {
}
@Override
- protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
+ protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
+ int timeout) {
// Fail only once, we don't want to slow down the test.
if (failedOnce) {
- return () -> ordinal;
+ return CompletableFuture.completedFuture(ordinal);
} else {
failedOnce = true;
- return () -> {
- throw new IOException("Sample Exception: Failed to replicate.");
- };
+ CompletableFuture<Integer> future = new CompletableFuture<Integer>();
+ future.completeExceptionally(new IOException("Sample Exception: Failed to replicate."));
+ return future;
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
index 803c4278f97..c48755fb5f0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
@@ -21,7 +21,7 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -228,15 +228,15 @@ public class TestReplicator extends TestReplicationBase {
}
@Override
- protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
- return () -> {
- int batchIndex = replicateEntries(entries, ordinal, timeout);
+ protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
+ int timeout) {
+ return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
entriesCount += entries.size();
int count = batchCount.incrementAndGet();
LOG.info(
"Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
- return batchIndex;
- };
+ });
+
}
}
@@ -245,20 +245,23 @@ public class TestReplicator extends TestReplicationBase {
private final AtomicBoolean failNext = new AtomicBoolean(false);
@Override
- protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
- return () -> {
- if (failNext.compareAndSet(false, true)) {
- int batchIndex = replicateEntries(entries, ordinal, timeout);
+ protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
+ int timeout) {
+
+ if (failNext.compareAndSet(false, true)) {
+ return replicateEntries(entries, ordinal, timeout).whenComplete((response, exception) -> {
entriesCount += entries.size();
int count = batchCount.incrementAndGet();
LOG.info(
"Completed replicating batch " + System.identityHashCode(entries) + " count=" + count);
- return batchIndex;
- } else if (failNext.compareAndSet(true, false)) {
- throw new ServiceException("Injected failure");
- }
- return ordinal;
- };
+ });
+ } else if (failNext.compareAndSet(true, false)) {
+ CompletableFuture<Integer> future = new CompletableFuture<Integer>();
+ future.completeExceptionally(new ServiceException("Injected failure"));
+ return future;
+ }
+ return CompletableFuture.completedFuture(ordinal);
+
}
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
index c0eace0bbda..5f99b88e0a4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
@@ -22,7 +22,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
@@ -165,11 +165,10 @@ public class TestSerialReplicationEndpoint {
}
@Override
- protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
- return () -> {
- entryQueue.addAll(entries);
- return ordinal;
- };
+ protected CompletableFuture<Integer> asyncReplicate(List<Entry> entries, int ordinal,
+ int timeout) {
+ entryQueue.addAll(entries);
+ return CompletableFuture.completedFuture(ordinal);
}
@Override