You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/01/11 06:44:35 UTC
[hbase] branch branch-2.0 updated: HBASE-21663 Add replica scan
support
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch branch-2.0
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.0 by this push:
new 5ab2bcf HBASE-21663 Add replica scan support
5ab2bcf is described below
commit 5ab2bcf67c75d6682a747b6bc9324901c00d1b20
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Fri Jan 11 10:49:33 2019 +0800
HBASE-21663 Add replica scan support
---
.../client/AsyncAdminRequestRetryingCaller.java | 15 +-
.../hbase/client/AsyncBatchRpcRetryingCaller.java | 6 +-
.../hadoop/hbase/client/AsyncClientScanner.java | 103 +++++++-----
.../hbase/client/AsyncConnectionConfiguration.java | 21 +++
.../hadoop/hbase/client/AsyncConnectionImpl.java | 5 +-
.../AsyncMasterRequestRpcRetryingCaller.java | 16 +-
.../hbase/client/AsyncRpcRetryingCaller.java | 15 +-
.../client/AsyncRpcRetryingCallerFactory.java | 6 +-
.../AsyncScanSingleRegionRpcRetryingCaller.java | 9 +-
.../AsyncServerRequestRpcRetryingCaller.java | 16 +-
.../AsyncSingleRequestRpcRetryingCaller.java | 10 +-
.../hbase/client/ConnectionConfiguration.java | 9 +-
.../hadoop/hbase/client/ConnectionUtils.java | 101 ++++++++++-
.../hadoop/hbase/client/RawAsyncTableImpl.java | 93 ++--------
... AbstractTestAsyncTableRegionReplicasRead.java} | 102 +++++------
.../client/TestAsyncTableRegionReplicasGet.java | 187 +--------------------
.../client/TestAsyncTableRegionReplicasScan.java | 76 +++++++++
17 files changed, 370 insertions(+), 420 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
index 2d634b9..cf31d79 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminRequestRetryingCaller.java
@@ -19,11 +19,12 @@ package org.apache.hadoop.hbase.client;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
/**
@@ -40,7 +41,7 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
private final Callable<T> callable;
private ServerName serverName;
- public AsyncAdminRequestRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+ public AsyncAdminRequestRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
@@ -69,10 +70,4 @@ public class AsyncAdminRequestRetryingCaller<T> extends AsyncRpcRetryingCaller<T
future.complete(result);
});
}
-
- @Override
- CompletableFuture<T> call() {
- doCall();
- return future;
- }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index e268b2e..55590bd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -56,7 +56,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@@ -80,7 +80,7 @@ class AsyncBatchRpcRetryingCaller<T> {
private static final Logger LOG = LoggerFactory.getLogger(AsyncBatchRpcRetryingCaller.class);
- private final HashedWheelTimer retryTimer;
+ private final Timer retryTimer;
private final AsyncConnectionImpl conn;
@@ -130,7 +130,7 @@ class AsyncBatchRpcRetryingCaller<T> {
}
}
- public AsyncBatchRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+ public AsyncBatchRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, List<? extends Row> actions, long pauseNs, int maxAttempts,
long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index ac2d3d7..6d4aefd 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -19,17 +19,27 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.createScanResultCache;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.getLocateType;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCCallsMetrics;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.incRPCRetriesMetrics;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.incRegionCountMetrics;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.isRemote;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
+
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService.Interface;
@@ -59,6 +69,8 @@ class AsyncClientScanner {
private final AsyncConnectionImpl conn;
+ private final Timer retryTimer;
+
private final long pauseNs;
private final int maxAttempts;
@@ -72,7 +84,7 @@ class AsyncClientScanner {
private final ScanResultCache resultCache;
public AsyncClientScanner(Scan scan, AdvancedScanResultConsumer consumer, TableName tableName,
- AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long scanTimeoutNs,
+ AsyncConnectionImpl conn, Timer retryTimer, long pauseNs, int maxAttempts, long scanTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
if (scan.getStartRow() == null) {
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
@@ -84,6 +96,7 @@ class AsyncClientScanner {
this.consumer = consumer;
this.tableName = tableName;
this.conn = conn;
+ this.retryTimer = retryTimer;
this.pauseNs = pauseNs;
this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
@@ -120,20 +133,19 @@ class AsyncClientScanner {
}
}
- private int openScannerTries;
+ private final AtomicInteger openScannerTries = new AtomicInteger();
private CompletableFuture<OpenScannerResponse> callOpenScanner(HBaseRpcController controller,
HRegionLocation loc, ClientService.Interface stub) {
boolean isRegionServerRemote = isRemote(loc.getHostname());
incRPCCallsMetrics(scanMetrics, isRegionServerRemote);
- if (openScannerTries > 1) {
+ if (openScannerTries.getAndIncrement() > 1) {
incRPCRetriesMetrics(scanMetrics, isRegionServerRemote);
}
- openScannerTries++;
CompletableFuture<OpenScannerResponse> future = new CompletableFuture<>();
try {
- ScanRequest request = RequestConverter.buildScanRequest(loc.getRegionInfo().getRegionName(),
- scan, scan.getCaching(), false);
+ ScanRequest request = RequestConverter.buildScanRequest(loc.getRegion().getRegionName(), scan,
+ scan.getCaching(), false);
stub.scan(controller, request, resp -> {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
@@ -148,40 +160,53 @@ class AsyncClientScanner {
}
private void startScan(OpenScannerResponse resp) {
- conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId()).location(resp.loc)
- .remote(resp.isRegionServerRemote)
- .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
- .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
- .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
- .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
- .start(resp.controller, resp.resp).whenComplete((hasMore, error) -> {
- if (error != null) {
- consumer.onError(error);
- return;
- }
- if (hasMore) {
- openScanner();
- } else {
- consumer.onComplete();
- }
- });
+ addListener(conn.callerFactory.scanSingleRegion().id(resp.resp.getScannerId())
+ .location(resp.loc).remote(resp.isRegionServerRemote)
+ .scannerLeaseTimeoutPeriod(resp.resp.getTtl(), TimeUnit.MILLISECONDS).stub(resp.stub)
+ .setScan(scan).metrics(scanMetrics).consumer(consumer).resultCache(resultCache)
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
+ .start(resp.controller, resp.resp), (hasMore, error) -> {
+ if (error != null) {
+ consumer.onError(error);
+ return;
+ }
+ if (hasMore) {
+ openScanner();
+ } else {
+ consumer.onComplete();
+ }
+ });
+ }
+
+ private CompletableFuture<OpenScannerResponse> openScanner(int replicaId) {
+ return conn.callerFactory.<OpenScannerResponse> single().table(tableName)
+ .row(scan.getStartRow()).replicaId(replicaId).locateType(getLocateType(scan))
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
+ .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
+ .call();
+ }
+
+ private long getPrimaryTimeoutNs() {
+ return TableName.isMetaTableName(tableName) ? conn.connConf.getPrimaryMetaScanTimeoutNs()
+ : conn.connConf.getPrimaryScanTimeoutNs();
}
private void openScanner() {
incRegionCountMetrics(scanMetrics);
- openScannerTries = 1;
- conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow())
- .locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
- .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
- .call().whenComplete((resp, error) -> {
- if (error != null) {
- consumer.onError(error);
- return;
- }
- startScan(resp);
- });
+ openScannerTries.set(1);
+ addListener(
+ timelineConsistentRead(conn.getLocator(), tableName, scan, scan.getStartRow(),
+ getLocateType(scan), this::openScanner, rpcTimeoutNs, getPrimaryTimeoutNs(), retryTimer),
+ (resp, error) -> {
+ if (error != null) {
+ consumer.onError(error);
+ return;
+ }
+ startScan(resp);
+ });
}
public void start() {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index fa051a5..84a5150 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -26,6 +26,8 @@ import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_CLIENT_SCANNER_TI
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_META_SCANNER_CACHING;
import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT;
+import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_OPERATION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_PAUSE;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
@@ -41,6 +43,8 @@ import static org.apache.hadoop.hbase.client.AsyncProcess.DEFAULT_START_LOG_ERRO
import static org.apache.hadoop.hbase.client.AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND;
+import static org.apache.hadoop.hbase.client.ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT;
import static org.apache.hadoop.hbase.client.ConnectionConfiguration.WRITE_BUFFER_SIZE_DEFAULT;
@@ -100,6 +104,10 @@ class AsyncConnectionConfiguration {
// timeout, we will send request to secondaries.
private final long primaryCallTimeoutNs;
+ private final long primaryScanTimeoutNs;
+
+ private final long primaryMetaScanTimeoutNs;
+
@SuppressWarnings("deprecation")
AsyncConnectionConfiguration(Configuration conf) {
this.metaOperationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
@@ -132,6 +140,11 @@ class AsyncConnectionConfiguration {
WRITE_BUFFER_PERIODIC_FLUSH_TIMEOUT_MS_DEFAULT));
this.primaryCallTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
conf.getLong(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT));
+ this.primaryScanTimeoutNs = TimeUnit.MICROSECONDS.toNanos(
+ conf.getLong(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT));
+ this.primaryMetaScanTimeoutNs =
+ TimeUnit.MICROSECONDS.toNanos(conf.getLong(HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
+ HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT));
}
long getMetaOperationTimeoutNs() {
@@ -193,4 +206,12 @@ class AsyncConnectionConfiguration {
long getPrimaryCallTimeoutNs() {
return primaryCallTimeoutNs;
}
+
+ long getPrimaryScanTimeoutNs() {
+ return primaryScanTimeoutNs;
+ }
+
+ long getPrimaryMetaScanTimeoutNs() {
+ return primaryMetaScanTimeoutNs;
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 612240f..fa66f3d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -259,7 +259,7 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override
public AsyncTable<AdvancedScanResultConsumer> build() {
- return new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
+ return new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
}
};
}
@@ -271,7 +271,8 @@ class AsyncConnectionImpl implements AsyncConnection {
@Override
public AsyncTable<ScanResultConsumer> build() {
- RawAsyncTableImpl rawTable = new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
+ RawAsyncTableImpl rawTable =
+ new RawAsyncTableImpl(AsyncConnectionImpl.this, RETRY_TIMER, this);
return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
}
};
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
index 1c8a0e1..a52e799 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncMasterRequestRpcRetryingCaller.java
@@ -17,12 +17,12 @@
*/
package org.apache.hadoop.hbase.client;
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-
import java.util.concurrent.CompletableFuture;
-
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
/**
@@ -39,7 +39,7 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
private final Callable<T> callable;
- public AsyncMasterRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+ public AsyncMasterRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
Callable<T> callable, long pauseNs, int maxRetries, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
super(retryTimer, conn, pauseNs, maxRetries, operationTimeoutNs, rpcTimeoutNs,
@@ -66,10 +66,4 @@ public class AsyncMasterRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
});
});
}
-
- @Override
- public CompletableFuture<T> call() {
- doCall();
- return future;
- }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index e03049a..5383ff8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -23,7 +23,6 @@ import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import java.util.ArrayList;
import java.util.List;
@@ -31,20 +30,21 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;
-
import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
@InterfaceAudience.Private
public abstract class AsyncRpcRetryingCaller<T> {
private static final Logger LOG = LoggerFactory.getLogger(AsyncRpcRetryingCaller.class);
- private final HashedWheelTimer retryTimer;
+ private final Timer retryTimer;
private final long startNs;
@@ -68,9 +68,8 @@ public abstract class AsyncRpcRetryingCaller<T> {
protected final HBaseRpcController controller;
- public AsyncRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
- long pauseNs, int maxAttempts, long operationTimeoutNs,
- long rpcTimeoutNs, int startLogErrorsCnt) {
+ public AsyncRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn, long pauseNs,
+ int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.pauseNs = pauseNs;
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index a660e74..f019fc4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -31,7 +31,7 @@ import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
@@ -45,9 +45,9 @@ class AsyncRpcRetryingCallerFactory {
private final AsyncConnectionImpl conn;
- private final HashedWheelTimer retryTimer;
+ private final Timer retryTimer;
- public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, HashedWheelTimer retryTimer) {
+ public AsyncRpcRetryingCallerFactory(AsyncConnectionImpl conn, Timer retryTimer) {
this.conn = conn;
this.retryTimer = retryTimer;
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 9fdb284..584bfac 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -34,7 +34,6 @@ import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
-
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.NotServingRegionException;
@@ -49,9 +48,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hbase.thirdparty.io.netty.util.Timeout;
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
@@ -72,7 +73,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
private static final Logger LOG =
LoggerFactory.getLogger(AsyncScanSingleRegionRpcRetryingCaller.class);
- private final HashedWheelTimer retryTimer;
+ private final Timer retryTimer;
private final Scan scan;
@@ -297,7 +298,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
}
}
- public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
+ public AsyncScanSingleRegionRpcRetryingCaller(Timer retryTimer,
AsyncConnectionImpl conn, Scan scan, ScanMetrics scanMetrics, long scannerId,
ScanResultCache resultCache, AdvancedScanResultConsumer consumer, Interface stub,
HRegionLocation loc, boolean isRegionServerRemote, long scannerLeaseTimeoutPeriodNs,
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
index 20b7c31..54b055a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncServerRequestRpcRetryingCaller.java
@@ -17,14 +17,14 @@
*/
package org.apache.hadoop.hbase.client;
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
-
import org.apache.hadoop.hbase.ServerName;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
/**
@@ -42,7 +42,7 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
private final Callable<T> callable;
private ServerName serverName;
- public AsyncServerRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+ public AsyncServerRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt, ServerName serverName, Callable<T> callable) {
super(retryTimer, conn, pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs,
@@ -71,10 +71,4 @@ public class AsyncServerRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCall
future.complete(result);
});
}
-
- @Override
- CompletableFuture<T> call() {
- doCall();
- return future;
- }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 1a52e5c..4b60b18 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
@@ -53,7 +53,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
private final Callable<T> callable;
- public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
+ public AsyncSingleRequestRpcRetryingCaller(Timer retryTimer, AsyncConnectionImpl conn,
TableName tableName, byte[] row, int replicaId, RegionLocateType locateType,
Callable<T> callable, long pauseNs, int maxAttempts, long operationTimeoutNs,
long rpcTimeoutNs, int startLogErrorsCnt) {
@@ -114,10 +114,4 @@ class AsyncSingleRequestRpcRetryingCaller<T> extends AsyncRpcRetryingCaller<T> {
call(loc);
});
}
-
- @Override
- public CompletableFuture<T> call() {
- doCall();
- return future;
- }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index 55c62e7..53859c2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -41,6 +41,9 @@ public class ConnectionConfiguration {
public static final String PRIMARY_CALL_TIMEOUT_MICROSECOND =
"hbase.client.primaryCallTimeout.get";
public static final int PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT = 10000; // 10ms
+ public static final String PRIMARY_SCAN_TIMEOUT_MICROSECOND =
+ "hbase.client.replicaCallTimeout.scan";
+ public static final int PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT = 1000000; // 1s
private final long writeBufferSize;
private final long writeBufferPeriodicFlushTimeoutMs;
@@ -92,11 +95,11 @@ public class ConnectionConfiguration {
conf.getInt(PRIMARY_CALL_TIMEOUT_MICROSECOND, PRIMARY_CALL_TIMEOUT_MICROSECOND_DEFAULT);
this.replicaCallTimeoutMicroSecondScan =
- conf.getInt("hbase.client.replicaCallTimeout.scan", 1000000); // 1000 ms
+ conf.getInt(PRIMARY_SCAN_TIMEOUT_MICROSECOND, PRIMARY_SCAN_TIMEOUT_MICROSECOND_DEFAULT);
this.metaReplicaCallTimeoutMicroSecondScan =
- conf.getInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
- HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT);
+ conf.getInt(HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT,
+ HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT);
this.retries = conf.getInt(
HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 63ef865..122d754 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.HConstants.EMPTY_END_ROW;
import static org.apache.hadoop.hbase.HConstants.EMPTY_START_ROW;
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import java.io.IOException;
import java.lang.reflect.UndeclaredThrowableException;
@@ -31,11 +32,13 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.PrivateCellUtil;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
@@ -53,6 +56,7 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
@@ -123,9 +127,8 @@ public final class ConnectionUtils {
}
/**
- * A ClusterConnection that will short-circuit RPC making direct invocations against the
- * localhost if the invocation target is 'this' server; save on network and protobuf
- * invocations.
+ * A ClusterConnection that will short-circuit RPC making direct invocations against the localhost
+ * if the invocation target is 'this' server; save on network and protobuf invocations.
*/
// TODO This has to still do PB marshalling/unmarshalling stuff. Check how/whether we can avoid.
@VisibleForTesting // Class is visible so can assert we are short-circuiting when expected.
@@ -136,8 +139,7 @@ public final class ConnectionUtils {
private ShortCircuitingClusterConnection(Configuration conf, ExecutorService pool, User user,
ServerName serverName, AdminService.BlockingInterface admin,
- ClientService.BlockingInterface client)
- throws IOException {
+ ClientService.BlockingInterface client) throws IOException {
super(conf, pool, user);
this.serverName = serverName;
this.localHostAdmin = admin;
@@ -157,7 +159,8 @@ public final class ConnectionUtils {
@Override
public MasterKeepAliveConnection getMaster() throws IOException {
if (this.localHostClient instanceof MasterService.BlockingInterface) {
- return new ShortCircuitMasterConnection((MasterService.BlockingInterface)this.localHostClient);
+ return new ShortCircuitMasterConnection(
+ (MasterService.BlockingInterface) this.localHostClient);
}
return super.getMaster();
}
@@ -335,8 +338,8 @@ public final class ConnectionUtils {
return result;
}
Cell[] rawCells = result.rawCells();
- int index =
- Arrays.binarySearch(rawCells, keepCellsAfter, CellComparator.getInstance()::compareWithoutRow);
+ int index = Arrays.binarySearch(rawCells, keepCellsAfter,
+ CellComparator.getInstance()::compareWithoutRow);
if (index < 0) {
index = -index - 1;
} else {
@@ -406,7 +409,7 @@ public final class ConnectionUtils {
static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
- .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
+ .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
}
public static ScanResultCache createScanResultCache(Scan scan) {
@@ -489,4 +492,84 @@ public final class ConnectionUtils {
}
scanMetrics.countOfRegions.incrementAndGet();
}
+
+ /**
+ * Connect the two futures, if the src future is done, then mark the dst future as done. And if
+ * the dst future is done, then cancel the src future. This is used for timeline consistent read.
+ */
+ private static <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture) {
+ addListener(srcFuture, (r, e) -> {
+ if (e != null) {
+ dstFuture.completeExceptionally(e);
+ } else {
+ dstFuture.complete(r);
+ }
+ });
+ // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture.
+ // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst
+ // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in
+ // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the
+ // tie.
+ addListener(dstFuture, (r, e) -> srcFuture.cancel(false));
+ }
+
+ private static <T> void sendRequestsToSecondaryReplicas(
+ Function<Integer, CompletableFuture<T>> requestReplica, RegionLocations locs,
+ CompletableFuture<T> future) {
+ if (future.isDone()) {
+ // do not send requests to secondary replicas if the future is done, i.e, the primary request
+ // has already been finished.
+ return;
+ }
+ for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
+ CompletableFuture<T> secondaryFuture = requestReplica.apply(replicaId);
+ connect(secondaryFuture, future);
+ }
+ }
+
+ static <T> CompletableFuture<T> timelineConsistentRead(AsyncRegionLocator locator,
+ TableName tableName, Query query, byte[] row, RegionLocateType locateType,
+ Function<Integer, CompletableFuture<T>> requestReplica, long rpcTimeoutNs,
+ long primaryCallTimeoutNs, Timer retryTimer) {
+ if (query.getConsistency() == Consistency.STRONG) {
+ return requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
+ }
+ // user specifies a replica id explicitly, just send request to the specific replica
+ if (query.getReplicaId() >= 0) {
+ return requestReplica.apply(query.getReplicaId());
+ }
+ // Timeline consistent read, where we may send requests to other region replicas
+ CompletableFuture<T> primaryFuture = requestReplica.apply(RegionReplicaUtil.DEFAULT_REPLICA_ID);
+ CompletableFuture<T> future = new CompletableFuture<>();
+ connect(primaryFuture, future);
+ long startNs = System.nanoTime();
+ // after the getRegionLocations, all the locations for the replicas of this region should have
+ // been cached, so it is not big deal to locate them again when actually sending requests to
+ // these replicas.
+ addListener(locator.getRegionLocations(tableName, row, locateType, false, rpcTimeoutNs),
+ (locs, error) -> {
+ if (error != null) {
+ LOG.warn(
+ "Failed to locate all the replicas for table={}, row='{}', locateType={}" +
+ " give up timeline consistent read",
+ tableName, Bytes.toStringBinary(row), locateType, error);
+ return;
+ }
+ if (locs.size() <= 1) {
+ LOG.warn(
+ "There are no secondary replicas for region {}, give up timeline consistent read",
+ locs.getDefaultRegionLocation().getRegion());
+ return;
+ }
+ long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
+ if (delayNs <= 0) {
+ sendRequestsToSecondaryReplicas(requestReplica, locs, future);
+ } else {
+ retryTimer.newTimeout(
+ timeout -> sendRequestsToSecondaryReplicas(requestReplica, locs, future), delayNs,
+ TimeUnit.NANOSECONDS);
+ }
+ });
+ return future;
+ }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 2ab9f6a..3a94566 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import static org.apache.hadoop.hbase.client.ConnectionUtils.isEmptyStopRow;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.timelineConsistentRead;
import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
import com.google.protobuf.RpcChannel;
@@ -36,7 +37,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AsyncRpcRetryingCallerFactory.SingleRequestCallerBuilder;
import org.apache.hadoop.hbase.filter.BinaryComparator;
@@ -45,11 +45,10 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
+import org.apache.hbase.thirdparty.io.netty.util.Timer;
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.RequestConverter;
@@ -77,10 +76,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.CompareType
@InterfaceAudience.Private
class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
- private static final Logger LOG = LoggerFactory.getLogger(RawAsyncTableImpl.class);
-
private final AsyncConnectionImpl conn;
+ private final Timer retryTimer;
+
private final TableName tableName;
private final int defaultScannerCaching;
@@ -103,8 +102,9 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
private final int startLogErrorsCnt;
- RawAsyncTableImpl(AsyncConnectionImpl conn, AsyncTableBuilderBase<?> builder) {
+ RawAsyncTableImpl(AsyncConnectionImpl conn, Timer retryTimer, AsyncTableBuilderBase<?> builder) {
this.conn = conn;
+ this.retryTimer = retryTimer;
this.tableName = builder.tableName;
this.rpcTimeoutNs = builder.rpcTimeoutNs;
this.readRpcTimeoutNs = builder.readRpcTimeoutNs;
@@ -219,8 +219,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
return newCaller(row.getRow(), rpcTimeoutNs);
}
- private CompletableFuture<Result> get(Get get, int replicaId, long timeoutNs) {
- return this.<Result> newCaller(get, timeoutNs)
+ private CompletableFuture<Result> get(Get get, int replicaId) {
+ return this.<Result> newCaller(get, readRpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl
.<Get, GetRequest, GetResponse, Result> call(controller, loc, stub, get,
RequestConverter::buildGetRequest, (s, c, req, done) -> s.get(c, req, done),
@@ -228,78 +228,11 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
.replicaId(replicaId).call();
}
- // Connect the two futures, if the src future is done, then mark the dst future as done. And if
- // the dst future is done, then cancel the src future. This is used for timeline consistent read.
- private <T> void connect(CompletableFuture<T> srcFuture, CompletableFuture<T> dstFuture) {
- addListener(srcFuture, (r, e) -> {
- if (e != null) {
- dstFuture.completeExceptionally(e);
- } else {
- dstFuture.complete(r);
- }
- });
- // The cancellation may be a dummy one as the dstFuture may be completed by this srcFuture.
- // Notice that this is a bit tricky, as the execution chain maybe 'complete src -> complete dst
- // -> cancel src', for now it seems to be fine, as the will use CAS to set the result first in
- // CompletableFuture. If later this causes problems, we could use whenCompleteAsync to break the
- // tie.
- addListener(dstFuture, (r, e) -> srcFuture.cancel(false));
- }
-
- private void timelineConsistentGet(Get get, RegionLocations locs,
- CompletableFuture<Result> future) {
- if (future.isDone()) {
- // do not send requests to secondary replicas if the future is done, i.e, the primary request
- // has already been finished.
- return;
- }
- for (int replicaId = 1, n = locs.size(); replicaId < n; replicaId++) {
- CompletableFuture<Result> secondaryFuture = get(get, replicaId, readRpcTimeoutNs);
- connect(secondaryFuture, future);
- }
- }
-
@Override
public CompletableFuture<Result> get(Get get) {
- if (get.getConsistency() == Consistency.STRONG) {
- return get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
- }
- // user specifies a replica id explicitly, just send request to the specific replica
- if (get.getReplicaId() >= 0) {
- return get(get, get.getReplicaId(), readRpcTimeoutNs);
- }
-
- // Timeline consistent read, where we may send requests to other region replicas
- CompletableFuture<Result> primaryFuture =
- get(get, RegionReplicaUtil.DEFAULT_REPLICA_ID, readRpcTimeoutNs);
- CompletableFuture<Result> future = new CompletableFuture<>();
- connect(primaryFuture, future);
- long primaryCallTimeoutNs = conn.connConf.getPrimaryCallTimeoutNs();
- long startNs = System.nanoTime();
- addListener(conn.getLocator().getRegionLocations(tableName, get.getRow(),
- RegionLocateType.CURRENT, false, readRpcTimeoutNs), (locs, error) -> {
- if (error != null) {
- LOG.warn(
- "Failed to locate all the replicas for table={}, row='{}'," +
- " give up timeline consistent read",
- tableName, Bytes.toStringBinary(get.getRow()), error);
- return;
- }
- if (locs.size() <= 1) {
- LOG.warn(
- "There are no secondary replicas for region {}," + " give up timeline consistent read",
- locs.getDefaultRegionLocation().getRegion());
- return;
- }
- long delayNs = primaryCallTimeoutNs - (System.nanoTime() - startNs);
- if (delayNs <= 0) {
- timelineConsistentGet(get, locs, future);
- } else {
- AsyncConnectionImpl.RETRY_TIMER.newTimeout(
- timeout -> timelineConsistentGet(get, locs, future), delayNs, TimeUnit.NANOSECONDS);
- }
- });
- return future;
+ return timelineConsistentRead(conn.getLocator(), tableName, get, get.getRow(),
+ RegionLocateType.CURRENT, replicaId -> get(get, replicaId), readRpcTimeoutNs,
+ conn.connConf.getPrimaryCallTimeoutNs(), retryTimer);
}
@Override
@@ -494,8 +427,8 @@ class RawAsyncTableImpl implements AsyncTable<AdvancedScanResultConsumer> {
}
public void scan(Scan scan, AdvancedScanResultConsumer consumer) {
- new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, pauseNs,
- maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
+ new AsyncClientScanner(setDefaultScanConfig(scan), consumer, tableName, conn, retryTimer,
+ pauseNs, maxAttempts, scanTimeoutNs, readRpcTimeoutNs, startLogErrorsCnt).start();
}
private long resultSize2CacheSize(long maxResultSize) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableRegionReplicasRead.java
similarity index 72%
copy from hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
copy to hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableRegionReplicasRead.java
index 2117116..c70af51 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/AbstractTestAsyncTableRegionReplicasRead.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
@@ -32,7 +31,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
@@ -41,45 +39,32 @@ import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.testclassification.ClientTests;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
-import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;
-@RunWith(Parameterized.class)
-@Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncTableRegionReplicasGet {
+public abstract class AbstractTestAsyncTableRegionReplicasRead {
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasGet.class);
+ protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ protected static TableName TABLE_NAME = TableName.valueOf("async");
- private static TableName TABLE_NAME = TableName.valueOf("async");
+ protected static byte[] FAMILY = Bytes.toBytes("cf");
- private static byte[] FAMILY = Bytes.toBytes("cf");
+ protected static byte[] QUALIFIER = Bytes.toBytes("cq");
- private static byte[] QUALIFIER = Bytes.toBytes("cq");
+ protected static byte[] ROW = Bytes.toBytes("row");
- private static byte[] ROW = Bytes.toBytes("row");
+ protected static byte[] VALUE = Bytes.toBytes("value");
- private static byte[] VALUE = Bytes.toBytes("value");
+ protected static int REPLICA_COUNT = 3;
- private static int REPLICA_COUNT = 3;
-
- private static AsyncConnection ASYNC_CONN;
+ protected static AsyncConnection ASYNC_CONN;
@Rule
public TestName testName = new TestName();
@@ -97,13 +82,14 @@ public class TestAsyncTableRegionReplicasGet {
@Parameters
public static List<Object[]> params() {
- return Arrays.asList(new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getRawTable },
- new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getTable });
+ return Arrays.asList(
+ new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getRawTable },
+ new Supplier<?>[] { AbstractTestAsyncTableRegionReplicasRead::getTable });
}
- private static volatile boolean FAIL_PRIMARY_GET = false;
+ protected static volatile boolean FAIL_PRIMARY_GET = false;
- private static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
+ protected static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
new ConcurrentHashMap<>();
public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
@@ -113,9 +99,8 @@ public class TestAsyncTableRegionReplicasGet {
return Optional.of(this);
}
- @Override
- public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
- List<Cell> result) throws IOException {
+ private void recordAndTryFail(ObserverContext<RegionCoprocessorEnvironment> c)
+ throws IOException {
RegionInfo region = c.getEnvironment().getRegionInfo();
if (!region.getTable().equals(TABLE_NAME)) {
return;
@@ -126,12 +111,24 @@ public class TestAsyncTableRegionReplicasGet {
throw new IOException("Inject error");
}
}
+
+ @Override
+ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
+ List<Cell> result) throws IOException {
+ recordAndTryFail(c);
+ }
+
+ @Override
+ public void preScannerOpen(ObserverContext<RegionCoprocessorEnvironment> c, Scan scan)
+ throws IOException {
+ recordAndTryFail(c);
+ }
}
- private static boolean allReplicasHaveRow() throws IOException {
+ private static boolean allReplicasHaveRow(byte[] row) throws IOException {
for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
- if (region.get(new Get(ROW), false).isEmpty()) {
+ if (region.get(new Get(row), false).isEmpty()) {
return false;
}
}
@@ -139,30 +136,40 @@ public class TestAsyncTableRegionReplicasGet {
return true;
}
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
+ protected static void startClusterAndCreateTable() throws Exception {
// 10 mins
TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
TimeUnit.MINUTES.toMillis(10));
+ TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ TimeUnit.MINUTES.toMillis(10));
+ TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD,
+ TimeUnit.MINUTES.toMillis(10));
+
// 1 second
TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND,
TimeUnit.SECONDS.toMicros(1));
+ TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_SCAN_TIMEOUT_MICROSECOND,
+ TimeUnit.SECONDS.toMicros(1));
+
// set a small pause so we will retry very quickly
TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
+
// infinite retry
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
+
TEST_UTIL.startMiniCluster(3);
TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT)
.setCoprocessor(FailPrimaryGetCP.class.getName()).build());
TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
- AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
- table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
+ }
+
+ protected static void waitUntilAllReplicasHaveRow(byte[] row) throws IOException {
// this is the fastest way to let all replicas have the row
TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
- TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow());
+ TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow(row));
}
@AfterClass
@@ -171,26 +178,26 @@ public class TestAsyncTableRegionReplicasGet {
TEST_UTIL.shutdownMiniCluster();
}
- private static int getSecondaryGetCount() {
+ protected static int getSecondaryGetCount() {
return REPLICA_ID_TO_COUNT.entrySet().stream()
.filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID)
.mapToInt(e -> e.getValue().get()).sum();
}
- private static int getPrimaryGetCount() {
+ protected static int getPrimaryGetCount() {
AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID);
return primaryGetCount != null ? primaryGetCount.get() : 0;
}
+ // replicaId = -1 means do not set replica
+ protected abstract void readAndCheck(AsyncTable<?> table, int replicaId) throws Exception;
+
@Test
public void testNoReplicaRead() throws Exception {
FAIL_PRIMARY_GET = false;
REPLICA_ID_TO_COUNT.clear();
AsyncTable<?> table = getTable.get();
- Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
- for (int i = 0; i < 1000; i++) {
- assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
- }
+ readAndCheck(table, -1);
// the primary region is fine and the primary timeout is 1 second which is long enough, so we
// should not send any requests to secondary replicas even if the consistency is timeline.
Thread.sleep(5000);
@@ -202,10 +209,9 @@ public class TestAsyncTableRegionReplicasGet {
// fail the primary get request
FAIL_PRIMARY_GET = true;
REPLICA_ID_TO_COUNT.clear();
- Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
// make sure that we could still get the value from secondary replicas
AsyncTable<?> table = getTable.get();
- assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
+ readAndCheck(table, -1);
// make sure that the primary request has been canceled
Thread.sleep(5000);
int count = getPrimaryGetCount();
@@ -217,11 +223,9 @@ public class TestAsyncTableRegionReplicasGet {
public void testReadSpecificReplica() throws Exception {
FAIL_PRIMARY_GET = false;
REPLICA_ID_TO_COUNT.clear();
- Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
AsyncTable<?> table = getTable.get();
for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) {
- get.setReplicaId(replicaId);
- assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
+ readAndCheck(table, replicaId);
assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get());
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
index 2117116..3e1d994 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasGet.java
@@ -18,211 +18,38 @@
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ForkJoinPool;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.function.Supplier;
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.RegionObserver;
-import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
-import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameter;
-import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
@Category({ MediumTests.class, ClientTests.class })
-public class TestAsyncTableRegionReplicasGet {
+public class TestAsyncTableRegionReplicasGet extends AbstractTestAsyncTableRegionReplicasRead {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasGet.class);
- private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-
- private static TableName TABLE_NAME = TableName.valueOf("async");
-
- private static byte[] FAMILY = Bytes.toBytes("cf");
-
- private static byte[] QUALIFIER = Bytes.toBytes("cq");
-
- private static byte[] ROW = Bytes.toBytes("row");
-
- private static byte[] VALUE = Bytes.toBytes("value");
-
- private static int REPLICA_COUNT = 3;
-
- private static AsyncConnection ASYNC_CONN;
-
- @Rule
- public TestName testName = new TestName();
-
- @Parameter
- public Supplier<AsyncTable<?>> getTable;
-
- private static AsyncTable<?> getRawTable() {
- return ASYNC_CONN.getTable(TABLE_NAME);
- }
-
- private static AsyncTable<?> getTable() {
- return ASYNC_CONN.getTable(TABLE_NAME, ForkJoinPool.commonPool());
- }
-
- @Parameters
- public static List<Object[]> params() {
- return Arrays.asList(new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getRawTable },
- new Supplier<?>[] { TestAsyncTableRegionReplicasGet::getTable });
- }
-
- private static volatile boolean FAIL_PRIMARY_GET = false;
-
- private static ConcurrentMap<Integer, AtomicInteger> REPLICA_ID_TO_COUNT =
- new ConcurrentHashMap<>();
-
- public static final class FailPrimaryGetCP implements RegionObserver, RegionCoprocessor {
-
- @Override
- public Optional<RegionObserver> getRegionObserver() {
- return Optional.of(this);
- }
-
- @Override
- public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
- List<Cell> result) throws IOException {
- RegionInfo region = c.getEnvironment().getRegionInfo();
- if (!region.getTable().equals(TABLE_NAME)) {
- return;
- }
- REPLICA_ID_TO_COUNT.computeIfAbsent(region.getReplicaId(), k -> new AtomicInteger())
- .incrementAndGet();
- if (region.getRegionId() == RegionReplicaUtil.DEFAULT_REPLICA_ID && FAIL_PRIMARY_GET) {
- throw new IOException("Inject error");
- }
- }
- }
-
- private static boolean allReplicasHaveRow() throws IOException {
- for (RegionServerThread t : TEST_UTIL.getMiniHBaseCluster().getRegionServerThreads()) {
- for (HRegion region : t.getRegionServer().getRegions(TABLE_NAME)) {
- if (region.get(new Get(ROW), false).isEmpty()) {
- return false;
- }
- }
- }
- return true;
- }
-
@BeforeClass
public static void setUpBeforeClass() throws Exception {
- // 10 mins
- TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
- TimeUnit.MINUTES.toMillis(10));
- // 1 second
- TEST_UTIL.getConfiguration().setLong(ConnectionConfiguration.PRIMARY_CALL_TIMEOUT_MICROSECOND,
- TimeUnit.SECONDS.toMicros(1));
- // set a small pause so we will retry very quickly
- TEST_UTIL.getConfiguration().setLong(HConstants.HBASE_CLIENT_PAUSE, 10);
- // infinite retry
- TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, Integer.MAX_VALUE);
- TEST_UTIL.startMiniCluster(3);
- TEST_UTIL.getAdmin().createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
- .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).setRegionReplication(REPLICA_COUNT)
- .setCoprocessor(FailPrimaryGetCP.class.getName()).build());
- TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME);
- ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
+ startClusterAndCreateTable();
AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
- // this is the fastest way to let all replicas have the row
- TEST_UTIL.getAdmin().disableTable(TABLE_NAME);
- TEST_UTIL.getAdmin().enableTable(TABLE_NAME);
- TEST_UTIL.waitFor(30000, () -> allReplicasHaveRow());
+ waitUntilAllReplicasHaveRow(ROW);
}
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- IOUtils.closeQuietly(ASYNC_CONN);
- TEST_UTIL.shutdownMiniCluster();
- }
-
- private static int getSecondaryGetCount() {
- return REPLICA_ID_TO_COUNT.entrySet().stream()
- .filter(e -> e.getKey().intValue() != RegionReplicaUtil.DEFAULT_REPLICA_ID)
- .mapToInt(e -> e.getValue().get()).sum();
- }
-
- private static int getPrimaryGetCount() {
- AtomicInteger primaryGetCount = REPLICA_ID_TO_COUNT.get(RegionReplicaUtil.DEFAULT_REPLICA_ID);
- return primaryGetCount != null ? primaryGetCount.get() : 0;
- }
-
- @Test
- public void testNoReplicaRead() throws Exception {
- FAIL_PRIMARY_GET = false;
- REPLICA_ID_TO_COUNT.clear();
- AsyncTable<?> table = getTable.get();
- Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
- for (int i = 0; i < 1000; i++) {
- assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
- }
- // the primary region is fine and the primary timeout is 1 second which is long enough, so we
- // should not send any requests to secondary replicas even if the consistency is timeline.
- Thread.sleep(5000);
- assertEquals(0, getSecondaryGetCount());
- }
-
- @Test
- public void testReplicaRead() throws Exception {
- // fail the primary get request
- FAIL_PRIMARY_GET = true;
- REPLICA_ID_TO_COUNT.clear();
+ @Override
+ protected void readAndCheck(AsyncTable<?> table, int replicaId) throws Exception {
Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
- // make sure that we could still get the value from secondary replicas
- AsyncTable<?> table = getTable.get();
- assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
- // make sure that the primary request has been canceled
- Thread.sleep(5000);
- int count = getPrimaryGetCount();
- Thread.sleep(10000);
- assertEquals(count, getPrimaryGetCount());
- }
-
- @Test
- public void testReadSpecificReplica() throws Exception {
- FAIL_PRIMARY_GET = false;
- REPLICA_ID_TO_COUNT.clear();
- Get get = new Get(ROW).setConsistency(Consistency.TIMELINE);
- AsyncTable<?> table = getTable.get();
- for (int replicaId = 0; replicaId < REPLICA_COUNT; replicaId++) {
+ if (replicaId >= 0) {
get.setReplicaId(replicaId);
- assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
- assertEquals(1, REPLICA_ID_TO_COUNT.get(replicaId).get());
}
+ assertArrayEquals(VALUE, table.get(get).get().getValue(FAMILY, QUALIFIER));
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasScan.java
new file mode 100644
index 0000000..dd5c8e5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableRegionReplicasScan.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableRegionReplicasScan extends AbstractTestAsyncTableRegionReplicasRead {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestAsyncTableRegionReplicasScan.class);
+
+ private static int ROW_COUNT = 1000;
+
+ private static byte[] getRow(int i) {
+ return Bytes.toBytes(String.format("%s-%03d", Bytes.toString(ROW), i));
+ }
+
+ private static byte[] getValue(int i) {
+ return Bytes.toBytes(String.format("%s-%03d", Bytes.toString(VALUE), i));
+ }
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ startClusterAndCreateTable();
+ AsyncTable<?> table = ASYNC_CONN.getTable(TABLE_NAME);
+ for (int i = 0; i < ROW_COUNT; i++) {
+ table.put(new Put(getRow(i)).addColumn(FAMILY, QUALIFIER, getValue(i))).get();
+ }
+ waitUntilAllReplicasHaveRow(getRow(ROW_COUNT - 1));
+ }
+
+ @Override
+ protected void readAndCheck(AsyncTable<?> table, int replicaId) throws IOException {
+ Scan scan = new Scan().setConsistency(Consistency.TIMELINE).setCaching(1);
+ if (replicaId >= 0) {
+ scan.setReplicaId(replicaId);
+ }
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ for (int i = 0; i < 1000; i++) {
+ Result result = scanner.next();
+ assertNotNull(result);
+ assertArrayEquals(getValue(i), result.getValue(FAMILY, QUALIFIER));
+ }
+ }
+ }
+}