You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2017/01/17 06:34:18 UTC
hbase git commit: HBASE-17372 Make AsyncTable thread safe
Repository: hbase
Updated Branches:
refs/heads/master 4cb09a494 -> 4ab95ebbc
HBASE-17372 Make AsyncTable thread safe
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4ab95ebb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4ab95ebb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4ab95ebb
Branch: refs/heads/master
Commit: 4ab95ebbceb144d90e03bce45afa52bcb8c62c54
Parents: 4cb09a4
Author: zhangduo <zh...@apache.org>
Authored: Tue Jan 17 09:55:23 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue Jan 17 14:33:28 2017 +0800
----------------------------------------------------------------------
.../client/AsyncBatchRpcRetryingCaller.java | 54 ++-------
.../hadoop/hbase/client/AsyncClientScanner.java | 21 +++-
.../hadoop/hbase/client/AsyncConnection.java | 39 ++++--
.../client/AsyncConnectionConfiguration.java | 18 ++-
.../hbase/client/AsyncConnectionImpl.java | 27 +++--
.../hadoop/hbase/client/AsyncRegionLocator.java | 2 +-
.../client/AsyncRpcRetryingCallerFactory.java | 102 ++++++++++++----
.../AsyncScanSingleRegionRpcRetryingCaller.java | 4 +-
.../AsyncSingleRequestRpcRetryingCaller.java | 5 +-
.../client/AsyncSmallScanRpcRetryingCaller.java | 15 ++-
.../apache/hadoop/hbase/client/AsyncTable.java | 6 +-
.../hadoop/hbase/client/AsyncTableBase.java | 118 ++++++-------------
.../hadoop/hbase/client/AsyncTableBuilder.java | 113 ++++++++++++++++++
.../hbase/client/AsyncTableBuilderBase.java | 111 +++++++++++++++++
.../hadoop/hbase/client/AsyncTableImpl.java | 42 +++----
.../hadoop/hbase/client/ConnectionUtils.java | 15 +--
.../hadoop/hbase/client/RawAsyncTable.java | 2 +
.../hadoop/hbase/client/RawAsyncTableImpl.java | 113 ++++++++++--------
.../org/apache/hadoop/hbase/HConstants.java | 3 -
...TestAsyncSingleRequestRpcRetryingCaller.java | 52 +++-----
.../client/TestAsyncTableGetMultiThreaded.java | 28 ++---
.../hbase/client/TestRawAsyncTableScan.java | 6 +-
22 files changed, 583 insertions(+), 313 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
index 6f0b8e9..9b362d1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncBatchRpcRetryingCaller.java
@@ -21,7 +21,6 @@ import static org.apache.hadoop.hbase.CellUtil.createCellScanner;
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import static org.apache.hadoop.hbase.util.CollectionUtils.computeIfAbsent;
@@ -40,7 +39,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -61,7 +59,6 @@ import org.apache.hadoop.hbase.shaded.protobuf.ResponseConverter;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
-import org.apache.hadoop.hbase.util.AtomicUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -102,9 +99,7 @@ class AsyncBatchRpcRetryingCaller<T> {
private final long operationTimeoutNs;
- private final long readRpcTimeoutNs;
-
- private final long writeRpcTimeoutNs;
+ private final long rpcTimeoutNs;
private final int startLogErrorsCnt;
@@ -128,39 +123,22 @@ class AsyncBatchRpcRetryingCaller<T> {
public final ConcurrentMap<byte[], RegionRequest> actionsByRegion =
new ConcurrentSkipListMap<>(Bytes.BYTES_COMPARATOR);
- public final AtomicLong rpcTimeoutNs;
-
- public ServerRequest(long defaultRpcTimeoutNs) {
- this.rpcTimeoutNs = new AtomicLong(defaultRpcTimeoutNs);
- }
-
- public void addAction(HRegionLocation loc, Action action, long rpcTimeoutNs) {
+ public void addAction(HRegionLocation loc, Action action) {
computeIfAbsent(actionsByRegion, loc.getRegionInfo().getRegionName(),
() -> new RegionRequest(loc)).actions.add(action);
- // try update the timeout to a larger value
- if (this.rpcTimeoutNs.get() <= 0) {
- return;
- }
- if (rpcTimeoutNs <= 0) {
- this.rpcTimeoutNs.set(-1L);
- return;
- }
- AtomicUtils.updateMax(this.rpcTimeoutNs, rpcTimeoutNs);
}
}
public AsyncBatchRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
- TableName tableName, List<? extends Row> actions, long pauseNs, int maxRetries,
- long operationTimeoutNs, long readRpcTimeoutNs, long writeRpcTimeoutNs,
- int startLogErrorsCnt) {
+ TableName tableName, List<? extends Row> actions, long pauseNs, int maxAttempts,
+ long operationTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
this.tableName = tableName;
this.pauseNs = pauseNs;
- this.maxAttempts = retries2Attempts(maxRetries);
+ this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
- this.readRpcTimeoutNs = readRpcTimeoutNs;
- this.writeRpcTimeoutNs = writeRpcTimeoutNs;
+ this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;
this.actions = new ArrayList<>(actions.size());
@@ -366,7 +344,7 @@ class AsyncBatchRpcRetryingCaller<T> {
return;
}
HBaseRpcController controller = conn.rpcControllerFactory.newController();
- resetController(controller, Math.min(serverReq.rpcTimeoutNs.get(), remainingNs));
+ resetController(controller, Math.min(rpcTimeoutNs, remainingNs));
if (!cells.isEmpty()) {
controller.setCellScanner(createCellScanner(cells));
}
@@ -416,10 +394,6 @@ class AsyncBatchRpcRetryingCaller<T> {
retryTimer.newTimeout(t -> groupAndSend(actions, tries + 1), delayNs, TimeUnit.NANOSECONDS);
}
- private long getRpcTimeoutNs(Action action) {
- return action.getAction() instanceof Get ? readRpcTimeoutNs : writeRpcTimeoutNs;
- }
-
private void groupAndSend(Stream<Action> actions, int tries) {
long locateTimeoutNs;
if (operationTimeoutNs > 0) {
@@ -433,15 +407,6 @@ class AsyncBatchRpcRetryingCaller<T> {
}
ConcurrentMap<ServerName, ServerRequest> actionsByServer = new ConcurrentHashMap<>();
ConcurrentLinkedQueue<Action> locateFailed = new ConcurrentLinkedQueue<>();
- // use the small one as the default timeout value, and increase the timeout value if we have an
- // action in the group needs a larger timeout value.
- long defaultRpcTimeoutNs;
- if (readRpcTimeoutNs > 0) {
- defaultRpcTimeoutNs =
- writeRpcTimeoutNs > 0 ? Math.min(readRpcTimeoutNs, writeRpcTimeoutNs) : readRpcTimeoutNs;
- } else {
- defaultRpcTimeoutNs = writeRpcTimeoutNs > 0 ? writeRpcTimeoutNs : -1L;
- }
CompletableFuture.allOf(actions
.map(action -> conn.getLocator().getRegionLocation(tableName, action.getAction().getRow(),
RegionLocateType.CURRENT, locateTimeoutNs).whenComplete((loc, error) -> {
@@ -454,9 +419,8 @@ class AsyncBatchRpcRetryingCaller<T> {
addError(action, error, null);
locateFailed.add(action);
} else {
- computeIfAbsent(actionsByServer, loc.getServerName(),
- () -> new ServerRequest(defaultRpcTimeoutNs)).addAction(loc, action,
- getRpcTimeoutNs(action));
+ computeIfAbsent(actionsByServer, loc.getServerName(), ServerRequest::new)
+ .addAction(loc, action);
}
}))
.toArray(CompletableFuture[]::new)).whenComplete((v, r) -> {
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
index d7a3ed1..f656a6c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClientScanner.java
@@ -55,14 +55,21 @@ class AsyncClientScanner {
private final AsyncConnectionImpl conn;
+ private final long pauseNs;
+
+ private final int maxAttempts;
+
private final long scanTimeoutNs;
private final long rpcTimeoutNs;
+ private final int startLogErrorsCnt;
+
private final ScanResultCache resultCache;
public AsyncClientScanner(Scan scan, RawScanResultConsumer consumer, TableName tableName,
- AsyncConnectionImpl conn, long scanTimeoutNs, long rpcTimeoutNs) {
+ AsyncConnectionImpl conn, long pauseNs, int maxAttempts, long scanTimeoutNs,
+ long rpcTimeoutNs, int startLogErrorsCnt) {
if (scan.getStartRow() == null) {
scan.withStartRow(EMPTY_START_ROW, scan.includeStartRow());
}
@@ -73,8 +80,11 @@ class AsyncClientScanner {
this.consumer = consumer;
this.tableName = tableName;
this.conn = conn;
+ this.pauseNs = pauseNs;
+ this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
+ this.startLogErrorsCnt = startLogErrorsCnt;
this.resultCache = scan.getAllowPartialResults() || scan.getBatch() > 0
? new AllowPartialScanResultCache() : new CompleteScanResultCache();
}
@@ -117,7 +127,9 @@ class AsyncClientScanner {
conn.callerFactory.scanSingleRegion().id(resp.scannerId).location(resp.loc).stub(resp.stub)
.setScan(scan).consumer(consumer).resultCache(resultCache)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).start().whenComplete((hasMore, error) -> {
+ .scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).start()
+ .whenComplete((hasMore, error) -> {
if (error != null) {
consumer.onError(error);
return;
@@ -133,8 +145,9 @@ class AsyncClientScanner {
private void openScanner() {
conn.callerFactory.<OpenScannerResponse> single().table(tableName).row(scan.getStartRow())
.locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::callOpenScanner).call()
- .whenComplete((resp, error) -> {
+ .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).action(this::callOpenScanner)
+ .call().whenComplete((resp, error) -> {
if (error != null) {
consumer.onError(error);
return;
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
index 7b0f339..9f114ac 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnection.java
@@ -50,21 +50,32 @@ public interface AsyncConnection extends Closeable {
AsyncTableRegionLocator getRegionLocator(TableName tableName);
/**
- * Retrieve an RawAsyncTable implementation for accessing a table. The returned Table is not
- * thread safe, a new instance should be created for each using thread. This is a lightweight
- * operation, pooling or caching of the returned AsyncTable is neither required nor desired.
+ * Retrieve an {@link RawAsyncTable} implementation for accessing a table.
+ * <p>
+ * The returned instance will use default configs. Use {@link #getRawTableBuilder(TableName)} if you
+ * want to customize some configs.
* <p>
* This method no longer checks table existence. An exception will be thrown if the table does not
* exist only when the first operation is attempted.
* @param tableName the name of the table
* @return an RawAsyncTable to use for interactions with this table
+ * @see #getRawTableBuilder(TableName)
+ */
+ default RawAsyncTable getRawTable(TableName tableName) {
+ return getRawTableBuilder(tableName).build();
+ }
+
+ /**
+ * Returns an {@link AsyncTableBuilder} for creating {@link RawAsyncTable}.
+ * <p>
+ * This method no longer checks table existence. An exception will be thrown if the table does not
+ * exist only when the first operation is attempted.
+ * @param tableName the name of the table
*/
- RawAsyncTable getRawTable(TableName tableName);
+ AsyncTableBuilder<RawAsyncTable> getRawTableBuilder(TableName tableName);
/**
- * Retrieve an AsyncTable implementation for accessing a table. The returned Table is not thread
- * safe, a new instance should be created for each using thread. This is a lightweight operation,
- * pooling or caching of the returned AsyncTable is neither required nor desired.
+ * Retrieve an AsyncTable implementation for accessing a table.
* <p>
* This method no longer checks table existence. An exception will be thrown if the table does not
* exist only when the first operation is attempted.
@@ -72,5 +83,17 @@ public interface AsyncConnection extends Closeable {
* @param pool the thread pool to use for executing callback
* @return an AsyncTable to use for interactions with this table
*/
- AsyncTable getTable(TableName tableName, ExecutorService pool);
+ default AsyncTable getTable(TableName tableName, ExecutorService pool) {
+ return getTableBuilder(tableName, pool).build();
+ }
+
+ /**
+ * Returns an {@link AsyncTableBuilder} for creating {@link AsyncTable}.
+ * <p>
+ * This method no longer checks table existence. An exception will be thrown if the table does not
+ * exist only when the first operation is attempted.
+ * @param tableName the name of the table
+ * @param pool the thread pool to use for executing callback
+ */
+ AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool);
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
index 6279d46..585a104 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionConfiguration.java
@@ -56,6 +56,10 @@ class AsyncConnectionConfiguration {
// by this value, see scanTimeoutNs.
private final long operationTimeoutNs;
+ // timeout for each rpc request. Can be overridden by a more specific config, such as
+ // readRpcTimeout or writeRpcTimeout.
+ private final long rpcTimeoutNs;
+
// timeout for each read rpc request
private final long readRpcTimeoutNs;
@@ -85,10 +89,12 @@ class AsyncConnectionConfiguration {
conf.getLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
this.operationTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.getLong(HBASE_CLIENT_OPERATION_TIMEOUT, DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
- this.readRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY,
- conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
- this.writeRpcTimeoutNs = TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY,
- conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT)));
+ this.rpcTimeoutNs = TimeUnit.MILLISECONDS
+ .toNanos(conf.getLong(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT));
+ this.readRpcTimeoutNs =
+ TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_READ_TIMEOUT_KEY, rpcTimeoutNs));
+ this.writeRpcTimeoutNs =
+ TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_RPC_WRITE_TIMEOUT_KEY, rpcTimeoutNs));
this.pauseNs =
TimeUnit.MILLISECONDS.toNanos(conf.getLong(HBASE_CLIENT_PAUSE, DEFAULT_HBASE_CLIENT_PAUSE));
this.maxRetries = conf.getInt(HBASE_CLIENT_RETRIES_NUMBER, DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
@@ -111,6 +117,10 @@ class AsyncConnectionConfiguration {
return operationTimeoutNs;
}
+ long getRpcTimeoutNs() {
+ return rpcTimeoutNs;
+ }
+
long getReadRpcTimeoutNs() {
return readRpcTimeoutNs;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index d660b02..c58500a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.CLUSTER_ID_DEFAULT;
-import static org.apache.hadoop.hbase.HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
-import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.client.ConnectionUtils.NO_NONCE_GENERATOR;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getStubKey;
import static org.apache.hadoop.hbase.client.NonceGenerator.CLIENT_NONCES_ENABLED_KEY;
@@ -90,7 +88,6 @@ class AsyncConnectionImpl implements AsyncConnection {
private final ConcurrentMap<String, ClientService.Interface> rsStubs = new ConcurrentHashMap<>();
- @SuppressWarnings("deprecation")
public AsyncConnectionImpl(Configuration conf, User user) {
this.conf = conf;
this.user = user;
@@ -105,7 +102,8 @@ class AsyncConnectionImpl implements AsyncConnection {
this.rpcClient = RpcClientFactory.createClient(conf, clusterId);
this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.hostnameCanChange = conf.getBoolean(RESOLVE_HOSTNAME_ON_FAIL_KEY, true);
- this.rpcTimeout = conf.getInt(HBASE_RPC_TIMEOUT_KEY, DEFAULT_HBASE_RPC_TIMEOUT);
+ this.rpcTimeout = (int) Math.min(Integer.MAX_VALUE,
+ TimeUnit.NANOSECONDS.toMillis(connConf.getRpcTimeoutNs()));
this.locator = new AsyncRegionLocator(this, RETRY_TIMER);
this.callerFactory = new AsyncRpcRetryingCallerFactory(this, RETRY_TIMER);
if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
@@ -152,12 +150,25 @@ class AsyncConnectionImpl implements AsyncConnection {
}
@Override
- public RawAsyncTable getRawTable(TableName tableName) {
- return new RawAsyncTableImpl(this, tableName);
+ public AsyncTableBuilder<RawAsyncTable> getRawTableBuilder(TableName tableName) {
+ return new AsyncTableBuilderBase<RawAsyncTable>(tableName, connConf) {
+
+ @Override
+ public RawAsyncTable build() {
+ return new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
+ }
+ };
}
@Override
- public AsyncTable getTable(TableName tableName, ExecutorService pool) {
- return new AsyncTableImpl(this, tableName, pool);
+ public AsyncTableBuilder<AsyncTable> getTableBuilder(TableName tableName, ExecutorService pool) {
+ return new AsyncTableBuilderBase<AsyncTable>(tableName, connConf) {
+
+ @Override
+ public AsyncTable build() {
+ RawAsyncTableImpl rawTable = new RawAsyncTableImpl(AsyncConnectionImpl.this, this);
+ return new AsyncTableImpl(AsyncConnectionImpl.this, rawTable, pool);
+ }
+ };
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
index 7a45ae3..7030eac 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionLocator.java
@@ -71,7 +71,7 @@ class AsyncRegionLocator {
future.completeExceptionally(new TimeoutIOException(timeoutMsg.get()));
}, timeoutNs, TimeUnit.NANOSECONDS);
return future.whenComplete((loc, error) -> {
- if (error.getClass() != TimeoutIOException.class) {
+ if (error != null && error.getClass() != TimeoutIOException.class) {
// cancel timeout task if we are not completed by it.
timeoutTask.cancel();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
index 55c56ab..76b6a33 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCallerFactory.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.client;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.*;
import io.netty.util.HashedWheelTimer;
@@ -46,7 +47,16 @@ class AsyncRpcRetryingCallerFactory {
this.retryTimer = retryTimer;
}
- public class SingleRequestCallerBuilder<T> {
+ private abstract class BuilderBase {
+
+ protected long pauseNs = conn.connConf.getPauseNs();
+
+ protected int maxAttempts = retries2Attempts(conn.connConf.getMaxRetries());
+
+ protected int startLogErrorsCnt = conn.connConf.getStartLogErrorsCnt();
+ }
+
+ public class SingleRequestCallerBuilder<T> extends BuilderBase {
private TableName tableName;
@@ -91,12 +101,26 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public SingleRequestCallerBuilder<T> pause(long pause, TimeUnit unit) {
+ this.pauseNs = unit.toNanos(pause);
+ return this;
+ }
+
+ public SingleRequestCallerBuilder<T> maxAttempts(int maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ return this;
+ }
+
+ public SingleRequestCallerBuilder<T> startLogErrorsCnt(int startLogErrorsCnt) {
+ this.startLogErrorsCnt = startLogErrorsCnt;
+ return this;
+ }
+
public AsyncSingleRequestRpcRetryingCaller<T> build() {
return new AsyncSingleRequestRpcRetryingCaller<>(retryTimer, conn,
checkNotNull(tableName, "tableName is null"), checkNotNull(row, "row is null"),
checkNotNull(locateType, "locateType is null"), checkNotNull(callable, "action is null"),
- conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs,
- rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
+ pauseNs, maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
/**
@@ -114,7 +138,7 @@ class AsyncRpcRetryingCallerFactory {
return new SingleRequestCallerBuilder<>();
}
- public class SmallScanCallerBuilder {
+ public class SmallScanCallerBuilder extends BuilderBase {
private TableName tableName;
@@ -151,12 +175,27 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public SmallScanCallerBuilder pause(long pause, TimeUnit unit) {
+ this.pauseNs = unit.toNanos(pause);
+ return this;
+ }
+
+ public SmallScanCallerBuilder maxAttempts(int maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ return this;
+ }
+
+ public SmallScanCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
+ this.startLogErrorsCnt = startLogErrorsCnt;
+ return this;
+ }
+
public AsyncSmallScanRpcRetryingCaller build() {
TableName tableName = checkNotNull(this.tableName, "tableName is null");
Scan scan = checkNotNull(this.scan, "scan is null");
checkArgument(limit > 0, "invalid limit %d", limit);
- return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, scanTimeoutNs,
- rpcTimeoutNs);
+ return new AsyncSmallScanRpcRetryingCaller(conn, tableName, scan, limit, pauseNs, maxAttempts,
+ scanTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
/**
@@ -174,7 +213,7 @@ class AsyncRpcRetryingCallerFactory {
return new SmallScanCallerBuilder();
}
- public class ScanSingleRegionCallerBuilder {
+ public class ScanSingleRegionCallerBuilder extends BuilderBase {
private long scannerId = -1L;
@@ -232,15 +271,29 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
+ public ScanSingleRegionCallerBuilder pause(long pause, TimeUnit unit) {
+ this.pauseNs = unit.toNanos(pause);
+ return this;
+ }
+
+ public ScanSingleRegionCallerBuilder maxAttempts(int maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ return this;
+ }
+
+ public ScanSingleRegionCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
+ this.startLogErrorsCnt = startLogErrorsCnt;
+ return this;
+ }
+
public AsyncScanSingleRegionRpcRetryingCaller build() {
checkArgument(scannerId >= 0, "invalid scannerId %d", scannerId);
return new AsyncScanSingleRegionRpcRetryingCaller(retryTimer, conn,
checkNotNull(scan, "scan is null"), scannerId,
checkNotNull(resultCache, "resultCache is null"),
checkNotNull(consumer, "consumer is null"), checkNotNull(stub, "stub is null"),
- checkNotNull(loc, "location is null"), conn.connConf.getPauseNs(),
- conn.connConf.getMaxRetries(), scanTimeoutNs, rpcTimeoutNs,
- conn.connConf.getStartLogErrorsCnt());
+ checkNotNull(loc, "location is null"), pauseNs, maxAttempts, scanTimeoutNs, rpcTimeoutNs,
+ startLogErrorsCnt);
}
/**
@@ -258,7 +311,7 @@ class AsyncRpcRetryingCallerFactory {
return new ScanSingleRegionCallerBuilder();
}
- public class BatchCallerBuilder {
+ public class BatchCallerBuilder extends BuilderBase {
private TableName tableName;
@@ -266,9 +319,7 @@ class AsyncRpcRetryingCallerFactory {
private long operationTimeoutNs = -1L;
- private long readRpcTimeoutNs = -1L;
-
- private long writeRpcTimeoutNs = -1L;
+ private long rpcTimeoutNs = -1L;
public BatchCallerBuilder table(TableName tableName) {
this.tableName = tableName;
@@ -285,20 +336,29 @@ class AsyncRpcRetryingCallerFactory {
return this;
}
- public BatchCallerBuilder readRpcTimeout(long rpcTimeout, TimeUnit unit) {
- this.readRpcTimeoutNs = unit.toNanos(rpcTimeout);
+ public BatchCallerBuilder rpcTimeout(long rpcTimeout, TimeUnit unit) {
+ this.rpcTimeoutNs = unit.toNanos(rpcTimeout);
+ return this;
+ }
+
+ public BatchCallerBuilder pause(long pause, TimeUnit unit) {
+ this.pauseNs = unit.toNanos(pause);
+ return this;
+ }
+
+ public BatchCallerBuilder maxAttempts(int maxAttempts) {
+ this.maxAttempts = maxAttempts;
return this;
}
- public BatchCallerBuilder writeRpcTimeout(long rpcTimeout, TimeUnit unit) {
- this.writeRpcTimeoutNs = unit.toNanos(rpcTimeout);
+ public BatchCallerBuilder startLogErrorsCnt(int startLogErrorsCnt) {
+ this.startLogErrorsCnt = startLogErrorsCnt;
return this;
}
public <T> AsyncBatchRpcRetryingCaller<T> build() {
- return new AsyncBatchRpcRetryingCaller<T>(retryTimer, conn, tableName, actions,
- conn.connConf.getPauseNs(), conn.connConf.getMaxRetries(), operationTimeoutNs,
- readRpcTimeoutNs, writeRpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
+ return new AsyncBatchRpcRetryingCaller<T>(retryTimer, conn, tableName, actions, pauseNs,
+ maxAttempts, operationTimeoutNs, rpcTimeoutNs, startLogErrorsCnt);
}
public <T> List<CompletableFuture<T>> call() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index dae88a7..5d3b736 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -108,7 +108,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
public AsyncScanSingleRegionRpcRetryingCaller(HashedWheelTimer retryTimer,
AsyncConnectionImpl conn, Scan scan, long scannerId, ScanResultCache resultCache,
RawScanResultConsumer consumer, Interface stub, HRegionLocation loc, long pauseNs,
- int maxRetries, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
+ int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs, int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.scan = scan;
this.scannerId = scannerId;
@@ -117,7 +117,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
this.stub = stub;
this.loc = loc;
this.pauseNs = pauseNs;
- this.maxAttempts = retries2Attempts(maxRetries);
+ this.maxAttempts = maxAttempts;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
index 04e69af..4ce6a18 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSingleRequestRpcRetryingCaller.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.client.ConnectionUtils.SLEEP_DELTA_NS;
import static org.apache.hadoop.hbase.client.ConnectionUtils.getPauseTime;
import static org.apache.hadoop.hbase.client.ConnectionUtils.resetController;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
import static org.apache.hadoop.hbase.client.ConnectionUtils.translateException;
import io.netty.util.HashedWheelTimer;
@@ -90,7 +89,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
public AsyncSingleRequestRpcRetryingCaller(HashedWheelTimer retryTimer, AsyncConnectionImpl conn,
TableName tableName, byte[] row, RegionLocateType locateType, Callable<T> callable,
- long pauseNs, int maxRetries, long operationTimeoutNs, long rpcTimeoutNs,
+ long pauseNs, int maxAttempts, long operationTimeoutNs, long rpcTimeoutNs,
int startLogErrorsCnt) {
this.retryTimer = retryTimer;
this.conn = conn;
@@ -99,7 +98,7 @@ class AsyncSingleRequestRpcRetryingCaller<T> {
this.locateType = locateType;
this.callable = callable;
this.pauseNs = pauseNs;
- this.maxAttempts = retries2Attempts(maxRetries);
+ this.maxAttempts = maxAttempts;
this.operationTimeoutNs = operationTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
this.startLogErrorsCnt = startLogErrorsCnt;
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
index 6ffa30a..98a276f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncSmallScanRpcRetryingCaller.java
@@ -57,6 +57,12 @@ class AsyncSmallScanRpcRetryingCaller {
private final long rpcTimeoutNs;
+ private final long pauseNs;
+
+ private final int maxAttempts;
+
+ private final int startLogErrosCnt;
+
private final Function<HRegionInfo, Boolean> nextScan;
private final List<Result> resultList;
@@ -64,13 +70,17 @@ class AsyncSmallScanRpcRetryingCaller {
private final CompletableFuture<List<Result>> future;
public AsyncSmallScanRpcRetryingCaller(AsyncConnectionImpl conn, TableName tableName, Scan scan,
- int limit, long scanTimeoutNs, long rpcTimeoutNs) {
+ int limit, long pauseNs, int maxAttempts, long scanTimeoutNs, long rpcTimeoutNs,
+ int startLogErrosCnt) {
this.conn = conn;
this.tableName = tableName;
this.scan = scan;
this.limit = limit;
this.scanTimeoutNs = scanTimeoutNs;
this.rpcTimeoutNs = rpcTimeoutNs;
+ this.pauseNs = pauseNs;
+ this.maxAttempts = maxAttempts;
+ this.startLogErrosCnt = startLogErrosCnt;
if (scan.isReversed()) {
this.nextScan = this::reversedNextScan;
} else {
@@ -146,7 +156,8 @@ class AsyncSmallScanRpcRetryingCaller {
private void scan() {
conn.callerFactory.<SmallScanResponse> single().table(tableName).row(scan.getStartRow())
.locateType(getLocateType(scan)).rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).action(this::scan).call()
+ .operationTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrosCnt).action(this::scan).call()
.whenComplete((resp, error) -> {
if (error != null) {
future.completeExceptionally(error);
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
index 893beb9..402ad64 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTable.java
@@ -23,9 +23,11 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* The asynchronous table for normal users.
* <p>
+ * The implementation is required to be thread safe.
+ * <p>
* The implementation should make sure that user can do everything they want to the returned
- * {@code CompletableFuture} without break anything. Usually the implementation will require user to
- * provide a {@code ExecutorService}.
+ * {@code CompletableFuture} without breaking anything. Usually the implementation will require user
+ * to provide a {@code ExecutorService}.
*/
@InterfaceAudience.Public
@InterfaceStability.Unstable
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
index d80627f..d82fa22 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBase.java
@@ -18,9 +18,8 @@
package org.apache.hadoop.hbase.client;
import static java.util.stream.Collectors.toList;
+import static org.apache.hadoop.hbase.client.ConnectionUtils.allOf;
import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.voidBatch;
-import static org.apache.hadoop.hbase.client.ConnectionUtils.voidBatchAll;
import com.google.common.base.Preconditions;
@@ -39,10 +38,9 @@ import org.apache.hadoop.hbase.util.Bytes;
* The base interface for asynchronous version of Table. Obtain an instance from a
* {@link AsyncConnection}.
* <p>
- * The implementation is NOT required to be thread safe. Do NOT access it from multiple threads
- * concurrently.
+ * The implementation is required to be thread safe.
* <p>
- * Usually the implementations will not throw any exception directly, you need to get the exception
+ * Usually the implementation will not throw any exception directly. You need to get the exception
* from the returned {@link CompletableFuture}.
*/
@InterfaceAudience.Public
@@ -62,12 +60,12 @@ public interface AsyncTableBase {
Configuration getConfiguration();
/**
- * Set timeout of each rpc read request in operations of this Table instance, will override the
- * value of {@code hbase.rpc.read.timeout} in configuration. If a rpc read request waiting too
- * long, it will stop waiting and send a new request to retry until retries exhausted or operation
- * timeout reached.
+ * Get timeout of each rpc request in this Table instance. It will be overridden by a more
+ * specific rpc timeout config such as readRpcTimeout or writeRpcTimeout.
+ * @see #getReadRpcTimeout(TimeUnit)
+ * @see #getWriteRpcTimeout(TimeUnit)
*/
- void setReadRpcTimeout(long timeout, TimeUnit unit);
+ long getRpcTimeout(TimeUnit unit);
/**
* Get timeout of each rpc read request in this Table instance.
@@ -75,47 +73,18 @@ public interface AsyncTableBase {
long getReadRpcTimeout(TimeUnit unit);
/**
- * Set timeout of each rpc write request in operations of this Table instance, will override the
- * value of {@code hbase.rpc.write.timeout} in configuration. If a rpc write request waiting too
- * long, it will stop waiting and send a new request to retry until retries exhausted or operation
- * timeout reached.
- */
- void setWriteRpcTimeout(long timeout, TimeUnit unit);
-
- /**
* Get timeout of each rpc write request in this Table instance.
*/
long getWriteRpcTimeout(TimeUnit unit);
/**
- * Set timeout of each operation in this Table instance, will override the value of
- * {@code hbase.client.operation.timeout} in configuration.
- * <p>
- * Operation timeout is a top-level restriction that makes sure an operation will not be blocked
- * more than this. In each operation, if rpc request fails because of timeout or other reason, it
- * will retry until success or throw a RetriesExhaustedException. But if the total time elapsed
- * reach the operation timeout before retries exhausted, it will break early and throw
- * SocketTimeoutException.
- */
- void setOperationTimeout(long timeout, TimeUnit unit);
-
- /**
* Get timeout of each operation in Table instance.
*/
long getOperationTimeout(TimeUnit unit);
/**
- * Set timeout of a single operation in a scan, such as openScanner and next. Will override the
- * value {@code hbase.client.scanner.timeout.period} in configuration.
- * <p>
- * Generally a scan will never timeout after we add heartbeat support unless the region is
- * crashed. The {@code scanTimeout} works like the {@code operationTimeout} for each single
- * operation in a scan.
- */
- void setScanTimeout(long timeout, TimeUnit unit);
-
- /**
- * Get the timeout of a single operation in a scan.
+ * Get the timeout of a single operation in a scan. It works like operation timeout for other
+ * operations.
*/
long getScanTimeout(TimeUnit unit);
@@ -353,29 +322,6 @@ public interface AsyncTableBase {
CompletableFuture<List<Result>> smallScan(Scan scan, int limit);
/**
- * Extracts certain cells from the given rows, in batch.
- * <p>
- * Notice that you may not get all the results with this function, which means some of the
- * returned {@link CompletableFuture}s may succeed while some of the other returned
- * {@link CompletableFuture}s may fail.
- * @param gets The objects that specify what data to fetch and from which rows.
- * @return A list of {@link CompletableFuture}s that represent the result for each get.
- */
- default List<CompletableFuture<Result>> get(List<Get> gets) {
- return batch(gets);
- }
-
- /**
- * A simple version for batch get. It will fail if there are any failures and you will get the
- * whole result list at once if the operation is succeeded.
- * @param gets The objects that specify what data to fetch and from which rows.
- * @return A {@link CompletableFuture} that wrapper the result list.
- */
- default CompletableFuture<List<Result>> getAll(List<Get> gets) {
- return batchAll(gets);
- }
-
- /**
* Test for the existence of columns in the table, as specified by the Gets.
* <p>
* This will return a list of booleans. Each value will be true if the related Get matches one or
@@ -386,8 +332,8 @@ public interface AsyncTableBase {
* @return A list of {@link CompletableFuture}s that represent the existence for each get.
*/
default List<CompletableFuture<Boolean>> exists(List<Get> gets) {
- return get(toCheckExistenceOnly(gets)).stream().
- <CompletableFuture<Boolean>>map(f -> f.thenApply(r -> r.getExists())).collect(toList());
+ return get(toCheckExistenceOnly(gets)).stream()
+ .<CompletableFuture<Boolean>> map(f -> f.thenApply(r -> r.getExists())).collect(toList());
}
/**
@@ -397,8 +343,28 @@ public interface AsyncTableBase {
* @return A {@link CompletableFuture} that wrapper the result boolean list.
*/
default CompletableFuture<List<Boolean>> existsAll(List<Get> gets) {
- return getAll(toCheckExistenceOnly(gets))
- .thenApply(l -> l.stream().map(r -> r.getExists()).collect(toList()));
+ return allOf(exists(gets));
+ }
+
+ /**
+ * Extracts certain cells from the given rows, in batch.
+ * <p>
+ * Notice that you may not get all the results with this function, which means some of the
+ * returned {@link CompletableFuture}s may succeed while some of the other returned
+ * {@link CompletableFuture}s may fail.
+ * @param gets The objects that specify what data to fetch and from which rows.
+ * @return A list of {@link CompletableFuture}s that represent the result for each get.
+ */
+ List<CompletableFuture<Result>> get(List<Get> gets);
+
+ /**
+ * A simple version for batch get. It will fail if there are any failures and you will get the
+ * whole result list at once if the operation is succeeded.
+ * @param gets The objects that specify what data to fetch and from which rows.
+ * @return A {@link CompletableFuture} that wrapper the result list.
+ */
+ default CompletableFuture<List<Result>> getAll(List<Get> gets) {
+ return allOf(get(gets));
}
/**
@@ -406,9 +372,7 @@ public interface AsyncTableBase {
* @param puts The list of mutations to apply.
* @return A list of {@link CompletableFuture}s that represent the result for each put.
*/
- default List<CompletableFuture<Void>> put(List<Put> puts) {
- return voidBatch(this, puts);
- }
+ List<CompletableFuture<Void>> put(List<Put> puts);
/**
* A simple version of batch put. It will fail if there are any failures.
@@ -416,7 +380,7 @@ public interface AsyncTableBase {
* @return A {@link CompletableFuture} that always returns null when complete normally.
*/
default CompletableFuture<Void> putAll(List<Put> puts) {
- return voidBatchAll(this, puts);
+ return allOf(put(puts)).thenApply(r -> null);
}
/**
@@ -424,9 +388,7 @@ public interface AsyncTableBase {
* @param deletes list of things to delete.
* @return A list of {@link CompletableFuture}s that represent the result for each delete.
*/
- default List<CompletableFuture<Void>> delete(List<Delete> deletes) {
- return voidBatch(this, deletes);
- }
+ List<CompletableFuture<Void>> delete(List<Delete> deletes);
/**
* A simple version of batch delete. It will fail if there are any failures.
@@ -434,7 +396,7 @@ public interface AsyncTableBase {
* @return A {@link CompletableFuture} that always returns null when complete normally.
*/
default CompletableFuture<Void> deleteAll(List<Delete> deletes) {
- return voidBatchAll(this, deletes);
+ return allOf(delete(deletes)).thenApply(r -> null);
}
/**
@@ -454,8 +416,6 @@ public interface AsyncTableBase {
* @return A list of the result for the actions. Wrapped by a {@link CompletableFuture}.
*/
default <T> CompletableFuture<List<T>> batchAll(List<? extends Row> actions) {
- List<CompletableFuture<T>> futures = batch(actions);
- return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
- .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
+ return allOf(batch(actions));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
new file mode 100644
index 0000000..2330855
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilder.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * For creating {@link AsyncTable} or {@link RawAsyncTable}.
+ * <p>
+ * The implementation should have default configurations set before returning the builder to user.
+ * So users are free to only set the configs they care about to create a new
+ * AsyncTable/RawAsyncTable instance.
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Unstable
+public interface AsyncTableBuilder<T extends AsyncTableBase> {
+
+ /**
+ * Set timeout for a whole operation such as get, put or delete. Notice that scan will not be
+ * effected by this value, see scanTimeoutNs.
+ * <p>
+ * Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
+ * we will stop retrying when we reach any of the limitations.
+ * @see #setMaxAttempts(int)
+ * @see #setMaxRetries(int)
+ * @see #setScanTimeout(long, TimeUnit)
+ */
+ AsyncTableBuilder<T> setOperationTimeout(long timeout, TimeUnit unit);
+
+ /**
+ * As now we have heartbeat support for scan, ideally a scan will never timeout unless the RS is
+ * crash. The RS will always return something before the rpc timed out or scan timed out to tell
+ * the client that it is still alive. The scan timeout is used as operation timeout for every
+ * operation in a scan, such as openScanner or next.
+ * @see #setScanTimeout(long, TimeUnit)
+ */
+ AsyncTableBuilder<T> setScanTimeout(long timeout, TimeUnit unit);
+
+ /**
+ * Set timeout for each rpc request.
+ * <p>
+ * Notice that this will <strong>NOT</strong> change the rpc timeout for read(get, scan) request
+ * and write request(put, delete).
+ */
+ AsyncTableBuilder<T> setRpcTimeout(long timeout, TimeUnit unit);
+
+ /**
+ * Set timeout for each read(get, scan) rpc request.
+ */
+ AsyncTableBuilder<T> setReadRpcTimeout(long timeout, TimeUnit unit);
+
+ /**
+ * Set timeout for each write(put, delete) rpc request.
+ */
+ AsyncTableBuilder<T> setWriteRpcTimeout(long timeout, TimeUnit unit);
+
+ /**
+ * Set the base pause time for retrying. We use an exponential policy to generate sleep time when
+ * retrying.
+ */
+ AsyncTableBuilder<T> setRetryPause(long pause, TimeUnit unit);
+
+ /**
+ * Set the max retry times for an operation. Usually it is the max attempt times minus 1.
+ * <p>
+ * Operation timeout and max attempt times(or max retry times) are both limitations for retrying,
+ * we will stop retrying when we reach any of the limitations.
+ * @see #setMaxAttempts(int)
+ * @see #setOperationTimeout(long, TimeUnit)
+ */
+ default AsyncTableBuilder<T> setMaxRetries(int maxRetries) {
+ return setMaxAttempts(retries2Attempts(maxRetries));
+ }
+
+ /**
+ * Set the max attempt times for an operation. Usually it is the max retry times plus 1. Operation
+ * timeout and max attempt times(or max retry times) are both limitations for retrying, we will
+ * stop retrying when we reach any of the limitations.
+ * @see #setMaxRetries(int)
+ * @see #setOperationTimeout(long, TimeUnit)
+ */
+ AsyncTableBuilder<T> setMaxAttempts(int maxAttempts);
+
+ /**
+ * Set the number of retries that are allowed before we start to log.
+ */
+ AsyncTableBuilder<T> setStartLogErrorsCnt(int startLogErrorsCnt);
+
+ /**
+ * Create the {@link AsyncTable} or {@link RawAsyncTable} instance.
+ */
+ T build();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
new file mode 100644
index 0000000..766895e
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableBuilderBase.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * Base class for all asynchronous table builders.
+ */
+@InterfaceAudience.Private
+abstract class AsyncTableBuilderBase<T extends AsyncTableBase> implements AsyncTableBuilder<T> {
+
+ protected TableName tableName;
+
+ protected long operationTimeoutNs;
+
+ protected long scanTimeoutNs;
+
+ protected long rpcTimeoutNs;
+
+ protected long readRpcTimeoutNs;
+
+ protected long writeRpcTimeoutNs;
+
+ protected long pauseNs;
+
+ protected int maxAttempts;
+
+ protected int startLogErrorsCnt;
+
+ AsyncTableBuilderBase(TableName tableName, AsyncConnectionConfiguration connConf) {
+ this.tableName = tableName;
+ this.operationTimeoutNs = tableName.isSystemTable() ? connConf.getMetaOperationTimeoutNs()
+ : connConf.getOperationTimeoutNs();
+ this.scanTimeoutNs = connConf.getScanTimeoutNs();
+ this.rpcTimeoutNs = connConf.getRpcTimeoutNs();
+ this.readRpcTimeoutNs = connConf.getReadRpcTimeoutNs();
+ this.writeRpcTimeoutNs = connConf.getWriteRpcTimeoutNs();
+ this.pauseNs = connConf.getPauseNs();
+ this.maxAttempts = retries2Attempts(connConf.getMaxRetries());
+ this.startLogErrorsCnt = connConf.getStartLogErrorsCnt();
+ }
+
+ @Override
+ public AsyncTableBuilderBase<T> setOperationTimeout(long timeout, TimeUnit unit) {
+ this.operationTimeoutNs = unit.toNanos(timeout);
+ return this;
+ }
+
+ @Override
+ public AsyncTableBuilderBase<T> setScanTimeout(long timeout, TimeUnit unit) {
+ this.scanTimeoutNs = unit.toNanos(timeout);
+ return this;
+ }
+
+ @Override
+ public AsyncTableBuilderBase<T> setRpcTimeout(long timeout, TimeUnit unit) {
+ this.rpcTimeoutNs = unit.toNanos(timeout);
+ return this;
+ }
+
+ @Override
+ public AsyncTableBuilderBase<T> setReadRpcTimeout(long timeout, TimeUnit unit) {
+ this.readRpcTimeoutNs = unit.toNanos(timeout);
+ return this;
+ }
+
+ @Override
+ public AsyncTableBuilderBase<T> setWriteRpcTimeout(long timeout, TimeUnit unit) {
+ this.writeRpcTimeoutNs = unit.toNanos(timeout);
+ return this;
+ }
+
+ @Override
+ public AsyncTableBuilderBase<T> setRetryPause(long pause, TimeUnit unit) {
+ this.pauseNs = unit.toNanos(pause);
+ return this;
+ }
+
+ @Override
+ public AsyncTableBuilderBase<T> setMaxAttempts(int maxAttempts) {
+ this.maxAttempts = maxAttempts;
+ return this;
+ }
+
+ @Override
+ public AsyncTableBuilderBase<T> setStartLogErrorsCnt(int startLogErrorsCnt) {
+ this.startLogErrorsCnt = startLogErrorsCnt;
+ return this;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
index 7281185..7cd257c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncTableImpl.java
@@ -22,7 +22,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
+import static java.util.stream.Collectors.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
@@ -42,8 +42,8 @@ class AsyncTableImpl implements AsyncTable {
private final long defaultScannerMaxResultSize;
- public AsyncTableImpl(AsyncConnectionImpl conn, TableName tableName, ExecutorService pool) {
- this.rawTable = conn.getRawTable(tableName);
+ AsyncTableImpl(AsyncConnectionImpl conn, RawAsyncTable rawTable, ExecutorService pool) {
+ this.rawTable = rawTable;
this.pool = pool;
this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
}
@@ -59,8 +59,8 @@ class AsyncTableImpl implements AsyncTable {
}
@Override
- public void setReadRpcTimeout(long timeout, TimeUnit unit) {
- rawTable.setReadRpcTimeout(timeout, unit);
+ public long getRpcTimeout(TimeUnit unit) {
+ return rawTable.getRpcTimeout(unit);
}
@Override
@@ -69,31 +69,16 @@ class AsyncTableImpl implements AsyncTable {
}
@Override
- public void setWriteRpcTimeout(long timeout, TimeUnit unit) {
- rawTable.setWriteRpcTimeout(timeout, unit);
- }
-
- @Override
public long getWriteRpcTimeout(TimeUnit unit) {
return rawTable.getWriteRpcTimeout(unit);
}
@Override
- public void setOperationTimeout(long timeout, TimeUnit unit) {
- rawTable.setOperationTimeout(timeout, unit);
- }
-
- @Override
public long getOperationTimeout(TimeUnit unit) {
return rawTable.getOperationTimeout(unit);
}
@Override
- public void setScanTimeout(long timeout, TimeUnit unit) {
- rawTable.setScanTimeout(timeout, unit);
- }
-
- @Override
public long getScanTimeout(TimeUnit unit) {
return rawTable.getScanTimeout(unit);
}
@@ -194,7 +179,22 @@ class AsyncTableImpl implements AsyncTable {
}
@Override
+ public List<CompletableFuture<Result>> get(List<Get> gets) {
+ return rawTable.get(gets).stream().map(this::wrap).collect(toList());
+ }
+
+ @Override
+ public List<CompletableFuture<Void>> put(List<Put> puts) {
+ return rawTable.put(puts).stream().map(this::wrap).collect(toList());
+ }
+
+ @Override
+ public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
+ return rawTable.delete(deletes).stream().map(this::wrap).collect(toList());
+ }
+
+ @Override
public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
- return rawTable.<T> batch(actions).stream().map(this::wrap).collect(Collectors.toList());
+ return rawTable.<T> batch(actions).stream().map(this::wrap).collect(toList());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 6f4a844..1abf3f2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -342,16 +342,6 @@ public final class ConnectionUtils {
return gets.stream().map(ConnectionUtils::toCheckExistenceOnly).collect(toList());
}
- static List<CompletableFuture<Void>> voidBatch(AsyncTableBase table,
- List<? extends Row> actions) {
- return table.<Object> batch(actions).stream().map(f -> f.<Void> thenApply(r -> null))
- .collect(toList());
- }
-
- static CompletableFuture<Void> voidBatchAll(AsyncTableBase table, List<? extends Row> actions) {
- return table.<Object> batchAll(actions).thenApply(r -> null);
- }
-
static RegionLocateType getLocateType(Scan scan) {
if (scan.isReversed()) {
if (isEmptyStartRow(scan.getStartRow())) {
@@ -389,4 +379,9 @@ public final class ConnectionUtils {
// the region.
return Bytes.compareTo(info.getStartKey(), scan.getStopRow()) <= 0;
}
+
+ static <T> CompletableFuture<List<T>> allOf(List<CompletableFuture<T>> futures) {
+ return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
+ .thenApply(v -> futures.stream().map(f -> f.getNow(null)).collect(toList()));
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
index 0c292a6..67099e8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTable.java
@@ -23,6 +23,8 @@ import org.apache.hadoop.hbase.classification.InterfaceStability;
/**
* A low level asynchronous table.
* <p>
+ * The implementation is required to be thread safe.
+ * <p>
* The returned {@code CompletableFuture} will be finished directly in the rpc framework's callback
* thread, so typically you should not do any time consuming work inside these methods, otherwise
* you will be likely to block at least one connection to RS(even more if the rpc framework uses
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
index 347c85b..d9d2d35 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RawAsyncTableImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;
+import static java.util.stream.Collectors.toList;
import static org.apache.hadoop.hbase.client.ConnectionUtils.checkHasFamilies;
import java.io.IOException;
@@ -67,24 +68,35 @@ class RawAsyncTableImpl implements RawAsyncTable {
private final long defaultScannerMaxResultSize;
- private long readRpcTimeoutNs;
+ private final long rpcTimeoutNs;
- private long writeRpcTimeoutNs;
+ private final long readRpcTimeoutNs;
- private long operationTimeoutNs;
+ private final long writeRpcTimeoutNs;
- private long scanTimeoutNs;
+ private final long operationTimeoutNs;
- public RawAsyncTableImpl(AsyncConnectionImpl conn, TableName tableName) {
+ private final long scanTimeoutNs;
+
+ private final long pauseNs;
+
+ private final int maxAttempts;
+
+ private final int startLogErrorsCnt;
+
+ RawAsyncTableImpl(AsyncConnectionImpl conn, AsyncTableBuilderBase<?> builder) {
this.conn = conn;
- this.tableName = tableName;
- this.readRpcTimeoutNs = conn.connConf.getReadRpcTimeoutNs();
- this.writeRpcTimeoutNs = conn.connConf.getWriteRpcTimeoutNs();
- this.operationTimeoutNs = tableName.isSystemTable() ? conn.connConf.getMetaOperationTimeoutNs()
- : conn.connConf.getOperationTimeoutNs();
+ this.tableName = builder.tableName;
+ this.rpcTimeoutNs = builder.rpcTimeoutNs;
+ this.readRpcTimeoutNs = builder.readRpcTimeoutNs;
+ this.writeRpcTimeoutNs = builder.writeRpcTimeoutNs;
+ this.operationTimeoutNs = builder.operationTimeoutNs;
+ this.scanTimeoutNs = builder.scanTimeoutNs;
+ this.pauseNs = builder.pauseNs;
+ this.maxAttempts = builder.maxAttempts;
+ this.startLogErrorsCnt = builder.startLogErrorsCnt;
this.defaultScannerCaching = conn.connConf.getScannerCaching();
this.defaultScannerMaxResultSize = conn.connConf.getScannerMaxResultSize();
- this.scanTimeoutNs = conn.connConf.getScanTimeoutNs();
}
@Override
@@ -178,7 +190,9 @@ class RawAsyncTableImpl implements RawAsyncTable {
private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, long rpcTimeoutNs) {
return conn.callerFactory.<T> single().table(tableName).row(row)
.rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS);
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .pause(pauseNs, TimeUnit.NANOSECONDS).maxAttempts(maxAttempts)
+ .startLogErrorsCnt(startLogErrorsCnt);
}
private <T> SingleRequestCallerBuilder<T> newCaller(Row row, long rpcTimeoutNs) {
@@ -214,7 +228,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
@Override
public CompletableFuture<Result> append(Append append) {
checkHasFamilies(append);
- return this.<Result> newCaller(append, writeRpcTimeoutNs)
+ return this.<Result> newCaller(append, rpcTimeoutNs)
.action((controller, loc, stub) -> this.<Append, Result> noncedMutate(controller, loc, stub,
append, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
.call();
@@ -223,7 +237,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
@Override
public CompletableFuture<Result> increment(Increment increment) {
checkHasFamilies(increment);
- return this.<Result> newCaller(increment, writeRpcTimeoutNs)
+ return this.<Result> newCaller(increment, rpcTimeoutNs)
.action((controller, loc, stub) -> this.<Increment, Result> noncedMutate(controller, loc,
stub, increment, RequestConverter::buildMutateRequest, RawAsyncTableImpl::toResult))
.call();
@@ -232,7 +246,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
@Override
public CompletableFuture<Boolean> checkAndPut(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Put put) {
- return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
+ return this.<Boolean> newCaller(row, rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Put, Boolean> mutate(controller, loc,
stub, put,
(rn, p) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@@ -244,7 +258,7 @@ class RawAsyncTableImpl implements RawAsyncTable {
@Override
public CompletableFuture<Boolean> checkAndDelete(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, Delete delete) {
- return this.<Boolean> newCaller(row, writeRpcTimeoutNs)
+ return this.<Boolean> newCaller(row, rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Delete, Boolean> mutate(controller,
loc, stub, delete,
(rn, d) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
@@ -303,20 +317,18 @@ class RawAsyncTableImpl implements RawAsyncTable {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(rn, rm);
regionMutationBuilder.setAtomic(true);
return MultiRequest.newBuilder().addRegionAction(regionMutationBuilder.build()).build();
- }, (resp) -> {
- return null;
- })).call();
+ }, resp -> null)).call();
}
@Override
public CompletableFuture<Boolean> checkAndMutate(byte[] row, byte[] family, byte[] qualifier,
CompareOp compareOp, byte[] value, RowMutations mutation) {
- return this.<Boolean> newCaller(mutation, writeRpcTimeoutNs)
+ return this.<Boolean> newCaller(mutation, rpcTimeoutNs)
.action((controller, loc, stub) -> RawAsyncTableImpl.<Boolean> mutateRow(controller, loc,
stub, mutation,
(rn, rm) -> RequestConverter.buildMutateRequest(rn, row, family, qualifier,
new BinaryComparator(value), CompareType.valueOf(compareOp.name()), rm),
- (resp) -> resp.getExists()))
+ resp -> resp.getExists()))
.call();
}
@@ -349,7 +361,8 @@ class RawAsyncTableImpl implements RawAsyncTable {
}
return conn.callerFactory.smallScan().table(tableName).setScan(setDefaultScanConfig(scan))
.limit(limit).scanTimeout(scanTimeoutNs, TimeUnit.NANOSECONDS)
- .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
+ .rpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
}
public void scan(Scan scan, RawScanResultConsumer consumer) {
@@ -362,55 +375,63 @@ class RawAsyncTableImpl implements RawAsyncTable {
}
}
scan = setDefaultScanConfig(scan);
- new AsyncClientScanner(scan, consumer, tableName, conn, scanTimeoutNs, readRpcTimeoutNs)
- .start();
+ new AsyncClientScanner(scan, consumer, tableName, conn, pauseNs, maxAttempts, scanTimeoutNs,
+ readRpcTimeoutNs, startLogErrorsCnt).start();
}
@Override
- public void setReadRpcTimeout(long timeout, TimeUnit unit) {
- this.readRpcTimeoutNs = unit.toNanos(timeout);
+ public List<CompletableFuture<Result>> get(List<Get> gets) {
+ return batch(gets, readRpcTimeoutNs);
}
@Override
- public long getReadRpcTimeout(TimeUnit unit) {
- return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
+ public List<CompletableFuture<Void>> put(List<Put> puts) {
+ return voidMutate(puts);
}
-
@Override
- public void setWriteRpcTimeout(long timeout, TimeUnit unit) {
- this.writeRpcTimeoutNs = unit.toNanos(timeout);
+ public List<CompletableFuture<Void>> delete(List<Delete> deletes) {
+ return voidMutate(deletes);
}
@Override
- public long getWriteRpcTimeout(TimeUnit unit) {
- return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
+ public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
+ return batch(actions, rpcTimeoutNs);
+ }
+
+ private List<CompletableFuture<Void>> voidMutate(List<? extends Row> actions) {
+ return this.<Object> batch(actions, writeRpcTimeoutNs).stream()
+ .map(f -> f.<Void> thenApply(r -> null)).collect(toList());
+ }
+
+ private <T> List<CompletableFuture<T>> batch(List<? extends Row> actions, long rpcTimeoutNs) {
+ return conn.callerFactory.batch().table(tableName).actions(actions)
+ .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
+ .rpcTimeout(rpcTimeoutNs, TimeUnit.NANOSECONDS).pause(pauseNs, TimeUnit.NANOSECONDS)
+ .maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt).call();
}
@Override
- public void setOperationTimeout(long timeout, TimeUnit unit) {
- this.operationTimeoutNs = unit.toNanos(timeout);
+ public long getRpcTimeout(TimeUnit unit) {
+ return unit.convert(rpcTimeoutNs, TimeUnit.NANOSECONDS);
}
@Override
- public long getOperationTimeout(TimeUnit unit) {
- return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
+ public long getReadRpcTimeout(TimeUnit unit) {
+ return unit.convert(readRpcTimeoutNs, TimeUnit.NANOSECONDS);
}
@Override
- public void setScanTimeout(long timeout, TimeUnit unit) {
- this.scanTimeoutNs = unit.toNanos(timeout);
+ public long getWriteRpcTimeout(TimeUnit unit) {
+ return unit.convert(writeRpcTimeoutNs, TimeUnit.NANOSECONDS);
}
@Override
- public long getScanTimeout(TimeUnit unit) {
- return TimeUnit.NANOSECONDS.convert(scanTimeoutNs, unit);
+ public long getOperationTimeout(TimeUnit unit) {
+ return unit.convert(operationTimeoutNs, TimeUnit.NANOSECONDS);
}
@Override
- public <T> List<CompletableFuture<T>> batch(List<? extends Row> actions) {
- return conn.callerFactory.batch().table(tableName).actions(actions)
- .operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
- .readRpcTimeout(readRpcTimeoutNs, TimeUnit.NANOSECONDS)
- .writeRpcTimeout(writeRpcTimeoutNs, TimeUnit.NANOSECONDS).call();
+ public long getScanTimeout(TimeUnit unit) {
+ return unit.convert(scanTimeoutNs, TimeUnit.NANOSECONDS);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 1eec691..609e9a5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -875,10 +875,7 @@ public final class HConstants {
/**
* timeout for each RPC
- * @deprecated Use {@link #HBASE_RPC_READ_TIMEOUT_KEY} or {@link #HBASE_RPC_WRITE_TIMEOUT_KEY}
- * instead.
*/
- @Deprecated
public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
index 4a391e0..7f54449 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncSingleRequestRpcRetryingCaller.java
@@ -30,16 +30,14 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.commons.io.IOUtils;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -60,7 +58,7 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
private static byte[] VALUE = Bytes.toBytes("value");
- private AsyncConnectionImpl asyncConn;
+ private static AsyncConnectionImpl CONN;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -68,38 +66,24 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
TEST_UTIL.getAdmin().setBalancerRunning(false, true);
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
+ CONN = new AsyncConnectionImpl(TEST_UTIL.getConfiguration(), User.getCurrent());
}
@AfterClass
public static void tearDownAfterClass() throws Exception {
+ IOUtils.closeQuietly(CONN);
TEST_UTIL.shutdownMiniCluster();
}
- @After
- public void tearDown() {
- if (asyncConn != null) {
- asyncConn.close();
- asyncConn = null;
- }
- }
-
- private void initConn(int startLogErrorsCnt, long pauseMs, int maxRetires) throws IOException {
- Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
- conf.setInt(AsyncProcess.START_LOG_ERRORS_AFTER_COUNT_KEY, startLogErrorsCnt);
- conf.setLong(HConstants.HBASE_CLIENT_PAUSE, pauseMs);
- conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, maxRetires);
- asyncConn = new AsyncConnectionImpl(conf, User.getCurrent());
- }
-
@Test
public void testRegionMove() throws InterruptedException, ExecutionException, IOException {
- initConn(0, 100, 30);
// This will leave a cached entry in location cache
- HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
+ HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
int index = TEST_UTIL.getHBaseCluster().getServerWith(loc.getRegionInfo().getRegionName());
TEST_UTIL.getAdmin().move(loc.getRegionInfo().getEncodedNameAsBytes(), Bytes.toBytes(
TEST_UTIL.getHBaseCluster().getRegionServer(1 - index).getServerName().getServerName()));
- RawAsyncTable table = asyncConn.getRawTable(TABLE_NAME);
+ RawAsyncTable table = CONN.getRawTableBuilder(TABLE_NAME).setRetryPause(100, TimeUnit.MILLISECONDS)
+ .setMaxRetries(30).build();
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
// move back
@@ -117,9 +101,9 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
@Test
public void testMaxRetries() throws IOException, InterruptedException {
- initConn(0, 10, 2);
try {
- asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
+ CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.DAYS)
+ .maxAttempts(3).pause(10, TimeUnit.MILLISECONDS)
.action((controller, loc, stub) -> failedFuture()).call().get();
fail();
} catch (ExecutionException e) {
@@ -129,14 +113,14 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
@Test
public void testOperationTimeout() throws IOException, InterruptedException {
- initConn(0, 100, Integer.MAX_VALUE);
long startNs = System.nanoTime();
try {
- asyncConn.callerFactory.single().table(TABLE_NAME).row(ROW)
- .operationTimeout(1, TimeUnit.SECONDS).action((controller, loc, stub) -> failedFuture())
- .call().get();
+ CONN.callerFactory.single().table(TABLE_NAME).row(ROW).operationTimeout(1, TimeUnit.SECONDS)
+ .pause(100, TimeUnit.MILLISECONDS).maxAttempts(Integer.MAX_VALUE)
+ .action((controller, loc, stub) -> failedFuture()).call().get();
fail();
} catch (ExecutionException e) {
+ e.printStackTrace();
assertThat(e.getCause(), instanceOf(RetriesExhaustedException.class));
}
long costNs = System.nanoTime() - startNs;
@@ -146,12 +130,11 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
@Test
public void testLocateError() throws IOException, InterruptedException, ExecutionException {
- initConn(0, 100, 5);
AtomicBoolean errorTriggered = new AtomicBoolean(false);
AtomicInteger count = new AtomicInteger(0);
- HRegionLocation loc = asyncConn.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
+ HRegionLocation loc = CONN.getRegionLocator(TABLE_NAME).getRegionLocation(ROW).get();
AsyncRegionLocator mockedLocator =
- new AsyncRegionLocator(asyncConn, AsyncConnectionImpl.RETRY_TIMER) {
+ new AsyncRegionLocator(CONN, AsyncConnectionImpl.RETRY_TIMER) {
@Override
CompletableFuture<HRegionLocation> getRegionLocation(TableName tableName, byte[] row,
RegionLocateType locateType, long timeoutNs) {
@@ -174,14 +157,15 @@ public class TestAsyncSingleRequestRpcRetryingCaller {
}
};
try (AsyncConnectionImpl mockedConn =
- new AsyncConnectionImpl(asyncConn.getConfiguration(), User.getCurrent()) {
+ new AsyncConnectionImpl(CONN.getConfiguration(), User.getCurrent()) {
@Override
AsyncRegionLocator getLocator() {
return mockedLocator;
}
}) {
- RawAsyncTable table = new RawAsyncTableImpl(mockedConn, TABLE_NAME);
+ RawAsyncTable table = mockedConn.getRawTableBuilder(TABLE_NAME)
+ .setRetryPause(100, TimeUnit.MILLISECONDS).setMaxRetries(5).build();
table.put(new Put(ROW).addColumn(FAMILY, QUALIFIER, VALUE)).get();
assertTrue(errorTriggered.get());
errorTriggered.set(false);
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
index 82fe3cd..880114a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableGetMultiThreaded.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hbase.client;
import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_META_OPERATION_TIMEOUT;
-import static org.apache.hadoop.hbase.HConstants.HBASE_CLIENT_RETRIES_NUMBER;
-import static org.apache.hadoop.hbase.HConstants.HBASE_RPC_READ_TIMEOUT_KEY;
import static org.apache.hadoop.hbase.master.balancer.BaseLoadBalancer.TABLES_ON_MASTER;
import static org.junit.Assert.assertEquals;
@@ -33,6 +31,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -72,6 +71,8 @@ public class TestAsyncTableGetMultiThreaded {
private static AsyncConnection CONN;
+ private static RawAsyncTable TABLE;
+
private static byte[][] SPLIT_KEYS;
@BeforeClass
@@ -79,14 +80,13 @@ public class TestAsyncTableGetMultiThreaded {
setUp(HColumnDescriptor.MemoryCompaction.NONE);
}
- protected static void setUp(HColumnDescriptor.MemoryCompaction memoryCompaction) throws Exception {
+ protected static void setUp(HColumnDescriptor.MemoryCompaction memoryCompaction)
+ throws Exception {
TEST_UTIL.getConfiguration().set(TABLES_ON_MASTER, "none");
TEST_UTIL.getConfiguration().setLong(HBASE_CLIENT_META_OPERATION_TIMEOUT, 60000L);
- TEST_UTIL.getConfiguration().setLong(HBASE_RPC_READ_TIMEOUT_KEY, 1000L);
- TEST_UTIL.getConfiguration().setInt(HBASE_CLIENT_RETRIES_NUMBER, 1000);
TEST_UTIL.getConfiguration().setInt(ByteBufferPool.MAX_POOL_SIZE_KEY, 100);
TEST_UTIL.getConfiguration().set(CompactingMemStore.COMPACTING_MEMSTORE_TYPE_KEY,
- String.valueOf(memoryCompaction));
+ String.valueOf(memoryCompaction));
TEST_UTIL.startMiniCluster(5);
SPLIT_KEYS = new byte[8][];
@@ -96,10 +96,11 @@ public class TestAsyncTableGetMultiThreaded {
TEST_UTIL.createTable(TABLE_NAME, FAMILY);
TEST_UTIL.waitTableAvailable(TABLE_NAME);
CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration());
- CONN.getRawTable(TABLE_NAME)
- .putAll(
- IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
- .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList()))
+ TABLE = CONN.getRawTableBuilder(TABLE_NAME).setReadRpcTimeout(1, TimeUnit.SECONDS)
+ .setMaxRetries(1000).build();
+ TABLE.putAll(
+ IntStream.range(0, COUNT).mapToObj(i -> new Put(Bytes.toBytes(String.format("%03d", i)))
+ .addColumn(FAMILY, QUALIFIER, Bytes.toBytes(i))).collect(Collectors.toList()))
.get();
}
@@ -112,11 +113,8 @@ public class TestAsyncTableGetMultiThreaded {
private void run(AtomicBoolean stop) throws InterruptedException, ExecutionException {
while (!stop.get()) {
for (int i = 0; i < COUNT; i++) {
- assertEquals(i,
- Bytes.toInt(
- CONN.getRawTable(TABLE_NAME).get(new Get(Bytes.toBytes(String.format("%03d", i))))
- .get()
- .getValue(FAMILY, QUALIFIER)));
+ assertEquals(i, Bytes.toInt(TABLE.get(new Get(Bytes.toBytes(String.format("%03d", i))))
+ .get().getValue(FAMILY, QUALIFIER)));
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4ab95ebb/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
index 270e3e1..9f3970b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncTableScan.java
@@ -25,7 +25,6 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Queue;
-import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.hadoop.hbase.testclassification.ClientTests;
@@ -122,10 +121,7 @@ public class TestRawAsyncTableScan extends AbstractTestAsyncTableScan {
@Override
protected List<Result> doScan(Scan scan) throws Exception {
SimpleRawScanResultConsumer scanConsumer = new SimpleRawScanResultConsumer();
- RawAsyncTable table = ASYNC_CONN.getRawTable(TABLE_NAME);
- table.setScanTimeout(1, TimeUnit.HOURS);
- table.setReadRpcTimeout(1, TimeUnit.HOURS);
- table.scan(scan, scanConsumer);
+ ASYNC_CONN.getRawTable(TABLE_NAME).scan(scan, scanConsumer);
List<Result> results = new ArrayList<>();
for (Result result; (result = scanConsumer.take()) != null;) {
results.add(result);