You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2016/10/13 08:13:05 UTC
hbase git commit: HBASE-16664 Timeout logic in AsyncProcess is broken
Repository: hbase
Updated Branches:
refs/heads/master f11aa4542 -> 88ff71b91
HBASE-16664 Timeout logic in AsyncProcess is broken
Signed-off-by: chenheng <ch...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/88ff71b9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/88ff71b9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/88ff71b9
Branch: refs/heads/master
Commit: 88ff71b91b086984fdc5b8707d134a1d475e5103
Parents: f11aa45
Author: Phil Yang <ud...@gmail.com>
Authored: Sun Oct 9 15:25:11 2016 +0800
Committer: chenheng <ch...@apache.org>
Committed: Thu Oct 13 16:15:43 2016 +0800
----------------------------------------------------------------------
.../hadoop/hbase/client/AsyncProcess.java | 38 +++--
.../hbase/client/AsyncRequestFutureImpl.java | 64 +++----
.../hadoop/hbase/client/BufferedMutator.java | 10 ++
.../hbase/client/BufferedMutatorImpl.java | 20 ++-
.../client/CancellableRegionServerCallable.java | 22 ++-
.../hbase/client/ConnectionImplementation.java | 8 +-
.../org/apache/hadoop/hbase/client/HTable.java | 48 +++---
.../hadoop/hbase/client/HTableMultiplexer.java | 6 +-
.../hbase/client/MultiServerCallable.java | 15 +-
.../client/NoncedRegionServerCallable.java | 2 +-
.../hbase/client/RetryingTimeTracker.java | 3 +-
.../RpcRetryingCallerWithReadReplicas.java | 14 +-
.../hadoop/hbase/client/TestAsyncProcess.java | 39 +++--
.../hbase/client/HConnectionTestingUtility.java | 4 +-
.../org/apache/hadoop/hbase/client/TestHCM.java | 167 +++++++++++++++++--
15 files changed, 338 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index f2d9546..abefc46 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.client;
+import com.google.common.annotations.VisibleForTesting;
+
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
@@ -55,8 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* This class allows a continuous flow of requests. It's written to be compatible with a
* synchronous caller such as HTable.
@@ -212,7 +212,8 @@ class AsyncProcess {
protected final long pause;
protected int numTries;
protected int serverTrackerTimeout;
- protected int timeout;
+ protected int rpcTimeout;
+ protected int operationTimeout;
protected long primaryCallTimeoutMicroseconds;
/** Whether to log details for batch errors */
protected final boolean logBatchErrorDetails;
@@ -220,7 +221,7 @@ class AsyncProcess {
public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
- RpcControllerFactory rpcFactory, int rpcTimeout) {
+ RpcControllerFactory rpcFactory, int rpcTimeout, int operationTimeout) {
if (hc == null) {
throw new IllegalArgumentException("ClusterConnection cannot be null.");
}
@@ -236,7 +237,8 @@ class AsyncProcess {
// how many times we could try in total, one more than retry number
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
- this.timeout = rpcTimeout;
+ this.rpcTimeout = rpcTimeout;
+ this.operationTimeout = operationTimeout;
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
@@ -434,7 +436,7 @@ class AsyncProcess {
List<Integer> locationErrorRows, Map<ServerName, MultiAction<Row>> actionsByServer,
ExecutorService pool) {
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
- tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, timeout);
+ tableName, retainedActions, nonceGroup, pool, callback, results, needResults, null, -1);
// Add location errors if any
if (locationErrors != null) {
for (int i = 0; i < locationErrors.size(); ++i) {
@@ -448,6 +450,14 @@ class AsyncProcess {
return ars;
}
+ public void setRpcTimeout(int rpcTimeout) {
+ this.rpcTimeout = rpcTimeout;
+ }
+
+ public void setOperationTimeout(int operationTimeout) {
+ this.operationTimeout = operationTimeout;
+ }
+
/**
* Helper that is used when grouping the actions per region server.
*
@@ -473,7 +483,7 @@ class AsyncProcess {
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results) {
- return submitAll(pool, tableName, rows, callback, results, null, timeout);
+ return submitAll(pool, tableName, rows, callback, results, null, -1);
}
/**
* Submit immediately the list of rows, whatever the server status. Kept for backward
@@ -484,10 +494,11 @@ class AsyncProcess {
* @param rows the list of rows.
* @param callback the callback.
* @param results Optional array to return the results thru; backward compat.
+ * @param rpcTimeout rpc timeout for this batch, set -1 if want to use current setting.
*/
public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
- CancellableRegionServerCallable callable, int curTimeout) {
+ CancellableRegionServerCallable callable, int rpcTimeout) {
List<Action<Row>> actions = new ArrayList<Action<Row>>(rows.size());
// The position will be used by the processBatch to match the object array returned.
@@ -507,7 +518,7 @@ class AsyncProcess {
}
AsyncRequestFutureImpl<CResult> ars = createAsyncRequestFuture(
tableName, actions, ng.getNonceGroup(), getPool(pool), callback, results, results != null,
- callable, curTimeout);
+ callable, rpcTimeout);
ars.groupAndSendMultiAction(actions, 1);
return ars;
}
@@ -520,10 +531,11 @@ class AsyncProcess {
protected <CResult> AsyncRequestFutureImpl<CResult> createAsyncRequestFuture(
TableName tableName, List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
Batch.Callback<CResult> callback, Object[] results, boolean needResults,
- CancellableRegionServerCallable callable, int curTimeout) {
+ CancellableRegionServerCallable callable, int rpcTimeout) {
return new AsyncRequestFutureImpl<CResult>(
tableName, actions, nonceGroup, getPool(pool), needResults,
- results, callback, callable, curTimeout, this);
+ results, callback, callable, operationTimeout,
+ rpcTimeout > 0 ? rpcTimeout : this.rpcTimeout, this);
}
/** Wait until the async does not have more than max tasks in progress. */
@@ -664,8 +676,8 @@ class AsyncProcess {
*/
@VisibleForTesting
protected RpcRetryingCaller<AbstractResponse> createCaller(
- CancellableRegionServerCallable callable) {
- return rpcCallerFactory.<AbstractResponse> newCaller();
+ CancellableRegionServerCallable callable, int rpcTimeout) {
+ return rpcCallerFactory.<AbstractResponse> newCaller(rpcTimeout);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index 3894d58..a2642f3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -20,6 +20,24 @@
package org.apache.hadoop.hbase.client;
import com.google.common.annotations.VisibleForTesting;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.DoNotRetryIOException;
@@ -39,23 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.htrace.Trace;
-import java.io.IOException;
-import java.io.InterruptedIOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
/**
* The context, and return value, for a single submit/submitAll call.
* Note on how this class (one AP submit) works. Initially, all requests are split into groups
@@ -70,6 +71,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
private static final Log LOG = LogFactory.getLog(AsyncRequestFutureImpl.class);
+ private RetryingTimeTracker tracker;
+
/**
* Runnable (that can be submitted to thread pool) that waits for when it's time
* to issue replica calls, finds region replicas, groups the requests by replica and
@@ -219,12 +222,12 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
if (callable == null) {
callable = createCallable(server, tableName, multiAction);
}
- RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable);
+ RpcRetryingCaller<AbstractResponse> caller = asyncProcess.createCaller(callable,rpcTimeout);
try {
if (callsInProgress != null) {
callsInProgress.add(callable);
}
- res = caller.callWithoutRetries(callable, currentCallTotalTimeout);
+ res = caller.callWithoutRetries(callable, operationTimeout);
if (res == null) {
// Cancelled
return;
@@ -297,7 +300,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
private final boolean hasAnyReplicaGets;
private final long nonceGroup;
private CancellableRegionServerCallable currentCallable;
- private int currentCallTotalTimeout;
+ private int operationTimeout;
+ private int rpcTimeout;
private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
protected AsyncProcess asyncProcess;
@@ -337,10 +341,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
public AsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
- ExecutorService pool, boolean needResults, Object[] results,
- Batch.Callback<CResult> callback,
- CancellableRegionServerCallable callable, int timeout,
- AsyncProcess asyncProcess) {
+ ExecutorService pool, boolean needResults, Object[] results, Batch.Callback<CResult> callback,
+ CancellableRegionServerCallable callable, int operationTimeout, int rpcTimeout,
+ AsyncProcess asyncProcess) {
this.pool = pool;
this.callback = callback;
this.nonceGroup = nonceGroup;
@@ -410,9 +413,12 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
this.errorsByServer = createServerErrorTracker();
this.errors = (asyncProcess.globalErrors != null)
? asyncProcess.globalErrors : new BatchErrors();
+ this.operationTimeout = operationTimeout;
+ this.rpcTimeout = rpcTimeout;
this.currentCallable = callable;
- this.currentCallTotalTimeout = timeout;
-
+ if (callable == null) {
+ tracker = new RetryingTimeTracker().start();
+ }
}
@VisibleForTesting
@@ -1281,9 +1287,9 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
/**
* Create a callable. Isolated to be easily overridden in the tests.
*/
- private MultiServerCallable<Row> createCallable(final ServerName server,
- TableName tableName, final MultiAction<Row> multi) {
+ private MultiServerCallable<Row> createCallable(final ServerName server, TableName tableName,
+ final MultiAction<Row> multi) {
return new MultiServerCallable<Row>(asyncProcess.connection, tableName, server,
- multi, asyncProcess.rpcFactory.newController());
+ multi, asyncProcess.rpcFactory.newController(), rpcTimeout, tracker);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
index 5dc7fc3..fcc9af7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutator.java
@@ -120,6 +120,16 @@ public interface BufferedMutator extends Closeable {
long getWriteBufferSize();
/**
+ * Set rpc timeout for this mutator instance
+ */
+ void setRpcTimeout(int timeout);
+
+ /**
+ * Set operation timeout for this mutator instance
+ */
+ void setOperationTimeout(int timeout);
+
+ /**
* Listens for asynchronous exceptions on a {@link BufferedMutator}.
*/
@InterfaceAudience.Public
http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index 2d4c8b3..f7eb09d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -82,6 +82,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
private boolean closed = false;
private final ExecutorService pool;
private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
+ private int operationTimeout;
@VisibleForTesting
protected AsyncProcess ap; // non-final so can be overridden in test
@@ -107,9 +108,12 @@ public class BufferedMutatorImpl implements BufferedMutator {
this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
-
+ this.operationTimeout = conn.getConfiguration().getInt(
+ HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+ HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
// puts need to track errors globally due to how the APIs currently work.
- ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout);
+ ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory,
+ writeRpcTimeout, operationTimeout);
}
@Override
@@ -282,6 +286,18 @@ public class BufferedMutatorImpl implements BufferedMutator {
return this.writeBufferSize;
}
+ @Override
+ public void setRpcTimeout(int timeout) {
+ this.writeRpcTimeout = timeout;
+ ap.setRpcTimeout(timeout);
+ }
+
+ @Override
+ public void setOperationTimeout(int timeout) {
+ this.operationTimeout = timeout;
+ ap.setOperationTimeout(operationTimeout);
+ }
+
private class QueueRowAccess implements RowAccess<Row> {
private int remainder = undealtMutationCount.getAndSet(0);
http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
index 69f5b55..a0ff900 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/CancellableRegionServerCallable.java
@@ -30,15 +30,20 @@ import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
* This class is used to unify HTable calls with AsyncProcess Framework. HTable can use
* AsyncProcess directly though this class. Also adds global timeout tracking on top of
* RegionServerCallable and implements Cancellable.
+ * Global timeout tracking conflicts with logic in RpcRetryingCallerImpl's callWithRetries. So you
+ * can only use this callable in AsyncProcess which only uses callWithoutRetries and retries in its
+ * own implementation.
*/
@InterfaceAudience.Private
abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<T> implements
Cancellable {
- private final RetryingTimeTracker tracker = new RetryingTimeTracker();
-
+ private final RetryingTimeTracker tracker;
+ private final int rpcTimeout;
CancellableRegionServerCallable(Connection connection, TableName tableName, byte[] row,
- RpcController rpcController) {
+ RpcController rpcController, int rpcTimeout, RetryingTimeTracker tracker) {
super(connection, tableName, row, rpcController);
+ this.rpcTimeout = rpcTimeout;
+ this.tracker = tracker;
}
/* Override so can mess with the callTimeout.
@@ -46,7 +51,7 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<
* @see org.apache.hadoop.hbase.client.RegionServerCallable#rpcCall(int)
*/
@Override
- public T call(int callTimeout) throws IOException {
+ public T call(int operationTimeout) throws IOException {
if (isCancelled()) return null;
if (Thread.interrupted()) {
throw new InterruptedIOException();
@@ -54,11 +59,12 @@ abstract class CancellableRegionServerCallable<T> extends ClientServiceCallable<
// It is expected (it seems) that tracker.start can be called multiple times (on each trip
// through the call when retrying). Also, we can call start and no need of a stop.
this.tracker.start();
- int remainingTime = tracker.getRemainingTime(callTimeout);
- if (remainingTime == 0) {
- throw new DoNotRetryIOException("Timeout for mutate row");
+ int remainingTime = tracker.getRemainingTime(operationTimeout);
+ if (remainingTime <= 1) {
+ // "1" is a special return value in RetryingTimeTracker, see its implementation.
+ throw new DoNotRetryIOException("Operation rpcTimeout");
}
- return super.call(remainingTime);
+ return super.call(Math.min(rpcTimeout, remainingTime));
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 8db9dbf..9cf63dc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -1831,8 +1831,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
// For tests to override.
protected AsyncProcess createAsyncProcess(Configuration conf) {
// No default pool available.
- int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
- return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, rpcTimeout);
+ int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+ HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+ return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory,
+ rpcTimeout, operationTimeout);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 84f8024..2802a2c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -441,7 +441,7 @@ public class HTable implements Table {
RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
rpcControllerFactory, tableName, this.connection, get, pool,
connConfiguration.getRetriesNumber(),
- operationTimeout,
+ operationTimeout, readRpcTimeout,
connConfiguration.getPrimaryCallTimeoutMicroSecond());
return callable.call(operationTimeout);
}
@@ -479,15 +479,10 @@ public class HTable implements Table {
batch(actions, results, -1);
}
- public void batch(final List<? extends Row> actions, final Object[] results, int timeout)
+ public void batch(final List<? extends Row> actions, final Object[] results, int rpcTimeout)
throws InterruptedException, IOException {
- AsyncRequestFuture ars = null;
- if (timeout != -1) {
- ars = multiAp.submitAll(pool, tableName, actions, null, results, null, timeout);
- } else {
- // use default timeout in AP
- ars = multiAp.submitAll(pool, tableName, actions, null, results);
- }
+ AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null,
+ rpcTimeout);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -523,7 +518,8 @@ public class HTable implements Table {
throws IOException {
CancellableRegionServerCallable<SingleResponse> callable =
new CancellableRegionServerCallable<SingleResponse>(
- connection, getName(), delete.getRow(), this.rpcControllerFactory.newController()) {
+ connection, getName(), delete.getRow(), this.rpcControllerFactory.newController(),
+ writeRpcTimeout, new RetryingTimeTracker().start()) {
@Override
protected SingleResponse rpcCall() throws Exception {
MutateRequest request = RequestConverter.buildMutateRequest(
@@ -535,7 +531,7 @@ public class HTable implements Table {
List<Row> rows = new ArrayList<Row>();
rows.add(delete);
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
- null, null, callable, operationTimeout);
+ null, null, callable, writeRpcTimeout);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -593,7 +589,7 @@ public class HTable implements Table {
public void mutateRow(final RowMutations rm) throws IOException {
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(this.connection, getName(), rm.getRow(),
- rpcControllerFactory.newController()) {
+ rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()){
@Override
protected MultiResponse rpcCall() throws Exception {
RegionAction.Builder regionMutationBuilder = RequestConverter.buildRegionAction(
@@ -614,7 +610,7 @@ public class HTable implements Table {
}
};
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
- null, null, callable, operationTimeout);
+ null, null, callable, writeRpcTimeout);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -798,7 +794,8 @@ public class HTable implements Table {
throws IOException {
CancellableRegionServerCallable<SingleResponse> callable =
new CancellableRegionServerCallable<SingleResponse>(
- this.connection, getName(), row, this.rpcControllerFactory.newController()) {
+ this.connection, getName(), row, this.rpcControllerFactory.newController(),
+ writeRpcTimeout, new RetryingTimeTracker().start()) {
@Override
protected SingleResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -814,7 +811,7 @@ public class HTable implements Table {
Object[] results = new Object[1];
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rows,
- null, results, callable, operationTimeout);
+ null, results, callable, -1);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -831,7 +828,7 @@ public class HTable implements Table {
throws IOException {
CancellableRegionServerCallable<MultiResponse> callable =
new CancellableRegionServerCallable<MultiResponse>(connection, getName(), rm.getRow(),
- rpcControllerFactory.newController()) {
+ rpcControllerFactory.newController(), writeRpcTimeout, new RetryingTimeTracker().start()) {
@Override
protected MultiResponse rpcCall() throws Exception {
CompareType compareType = CompareType.valueOf(compareOp.name());
@@ -858,7 +855,7 @@ public class HTable implements Table {
* */
Object[] results = new Object[rm.getMutations().size()];
AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, rm.getMutations(),
- null, results, callable, operationTimeout);
+ null, results, callable, -1);
ars.waitUntilDone();
if (ars.hasError()) {
throw ars.getErrors();
@@ -1117,6 +1114,10 @@ public class HTable implements Table {
@Override
public void setOperationTimeout(int operationTimeout) {
this.operationTimeout = operationTimeout;
+ if (mutator != null) {
+ mutator.setOperationTimeout(operationTimeout);
+ }
+ multiAp.setOperationTimeout(operationTimeout);
}
@Override
@@ -1133,8 +1134,8 @@ public class HTable implements Table {
@Override
@Deprecated
public void setRpcTimeout(int rpcTimeout) {
- this.readRpcTimeout = rpcTimeout;
- this.writeRpcTimeout = rpcTimeout;
+ setReadRpcTimeout(rpcTimeout);
+ setWriteRpcTimeout(rpcTimeout);
}
@Override
@@ -1145,6 +1146,10 @@ public class HTable implements Table {
@Override
public void setWriteRpcTimeout(int writeRpcTimeout) {
this.writeRpcTimeout = writeRpcTimeout;
+ if (mutator != null) {
+ mutator.setRpcTimeout(writeRpcTimeout);
+ }
+ multiAp.setRpcTimeout(writeRpcTimeout);
}
@Override
@@ -1229,7 +1234,8 @@ public class HTable implements Table {
AsyncProcess asyncProcess =
new AsyncProcess(connection, configuration, pool,
RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
- true, RpcControllerFactory.instantiate(configuration), readRpcTimeout);
+ true, RpcControllerFactory.instantiate(configuration), readRpcTimeout,
+ operationTimeout);
AsyncRequestFuture future = asyncProcess.submitAll(null, tableName, execs,
new Callback<ClientProtos.CoprocessorServiceResult>() {
@@ -1281,6 +1287,8 @@ public class HTable implements Table {
.maxKeyValueSize(connConfiguration.getMaxKeyValueSize())
);
}
+ mutator.setRpcTimeout(writeRpcTimeout);
+ mutator.setOperationTimeout(operationTimeout);
return mutator;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index 2c1a61e..e8379ef 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -442,6 +442,7 @@ public class HTableMultiplexer {
private final int maxRetryInQueue;
private final AtomicInteger retryInQueue = new AtomicInteger(0);
private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
+ private final int operationTimeout;
public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
@@ -454,7 +455,10 @@ public class HTableMultiplexer {
this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
- this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout);
+ this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+ HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+ this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory,
+ writeRpcTimeout, operationTimeout);
this.executor = executor;
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
index 6067ef0..7d50a27 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MultiServerCallable.java
@@ -50,12 +50,13 @@ import com.google.common.annotations.VisibleForTesting;
*/
@InterfaceAudience.Private
class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiResponse> {
- private final MultiAction<R> multiAction;
- private final boolean cellBlock;
+ private MultiAction<R> multiAction;
+ private boolean cellBlock;
MultiServerCallable(final ClusterConnection connection, final TableName tableName,
- final ServerName location, final MultiAction<R> multi, RpcController rpcController) {
- super(connection, tableName, null, rpcController);
+ final ServerName location, final MultiAction<R> multi, RpcController rpcController,
+ int rpcTimeout, RetryingTimeTracker tracker) {
+ super(connection, tableName, null, rpcController, rpcTimeout, tracker);
this.multiAction = multi;
// RegionServerCallable has HRegionLocation field, but this is a multi-region request.
// Using region info from parent HRegionLocation would be a mistake for this class; so
@@ -64,6 +65,12 @@ class MultiServerCallable<R> extends CancellableRegionServerCallable<MultiRespon
this.cellBlock = isCellBlock();
}
+ public void reset(ServerName location, MultiAction<R> multiAction) {
+ this.location = new HRegionLocation(null, location);
+ this.multiAction = multiAction;
+ this.cellBlock = isCellBlock();
+ }
+
@Override
protected HRegionLocation getLocation() {
throw new RuntimeException("Cannot get region location for multi-region request");
http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
index aff0205..52ed263 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/NoncedRegionServerCallable.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
* @param <T> the class that the ServerCallable handles
*/
@InterfaceAudience.Private
-public abstract class NoncedRegionServerCallable<T> extends CancellableRegionServerCallable<T> {
+public abstract class NoncedRegionServerCallable<T> extends ClientServiceCallable<T> {
private final long nonce;
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
index b9438e6..e804e92 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RetryingTimeTracker.java
@@ -24,10 +24,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
class RetryingTimeTracker {
private long globalStartTime = -1;
- public void start() {
+ public RetryingTimeTracker start() {
if (this.globalStartTime < 0) {
this.globalStartTime = EnvironmentEdgeManager.currentTime();
}
+ return this;
}
public int getRemainingTime(int callTimeout) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 04553d2..a290c78 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -58,7 +58,8 @@ public class RpcRetryingCallerWithReadReplicas {
protected final Get get;
protected final TableName tableName;
protected final int timeBeforeReplicas;
- private final int callTimeout;
+ private final int operationTimeout;
+ private final int rpcTimeout;
private final int retries;
private final RpcControllerFactory rpcControllerFactory;
private final RpcRetryingCallerFactory rpcRetryingCallerFactory;
@@ -66,7 +67,7 @@ public class RpcRetryingCallerWithReadReplicas {
public RpcRetryingCallerWithReadReplicas(
RpcControllerFactory rpcControllerFactory, TableName tableName,
ClusterConnection cConnection, final Get get,
- ExecutorService pool, int retries, int callTimeout,
+ ExecutorService pool, int retries, int operationTimeout, int rpcTimeout,
int timeBeforeReplicas) {
this.rpcControllerFactory = rpcControllerFactory;
this.tableName = tableName;
@@ -75,7 +76,8 @@ public class RpcRetryingCallerWithReadReplicas {
this.get = get;
this.pool = pool;
this.retries = retries;
- this.callTimeout = callTimeout;
+ this.operationTimeout = operationTimeout;
+ this.rpcTimeout = rpcTimeout;
this.timeBeforeReplicas = timeBeforeReplicas;
this.rpcRetryingCallerFactory = new RpcRetryingCallerFactory(conf);
}
@@ -91,7 +93,7 @@ public class RpcRetryingCallerWithReadReplicas {
public ReplicaRegionServerCallable(int id, HRegionLocation location) {
super(RpcRetryingCallerWithReadReplicas.this.cConnection,
RpcRetryingCallerWithReadReplicas.this.tableName, get.getRow(),
- rpcControllerFactory.newController());
+ rpcControllerFactory.newController(), rpcTimeout, new RetryingTimeTracker());
this.id = id;
this.location = location;
}
@@ -133,7 +135,7 @@ public class RpcRetryingCallerWithReadReplicas {
ClientProtos.GetRequest request = RequestConverter.buildGetRequest(reg, get);
HBaseRpcController hrc = (HBaseRpcController)getRpcController();
hrc.reset();
- hrc.setCallTimeout(callTimeout);
+ hrc.setCallTimeout(rpcTimeout);
hrc.setPriority(tableName);
ClientProtos.GetResponse response = getStub().get(hrc, request);
if (response == null) {
@@ -258,7 +260,7 @@ public class RpcRetryingCallerWithReadReplicas {
for (int id = min; id <= max; id++) {
HRegionLocation hrl = rl.getRegionLocation(id);
ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
- cs.submit(callOnReplica, callTimeout, id);
+ cs.submit(callOnReplica, operationTimeout, id);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index 0703e51..ed521a3 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -152,7 +152,10 @@ public class TestAsyncProcess {
final AtomicInteger nbActions = new AtomicInteger();
public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
public AtomicInteger callsCt = new AtomicInteger();
- private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+ HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
private long previousTimeout = -1;
@Override
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
@@ -162,7 +165,7 @@ public class TestAsyncProcess {
// Test HTable has tableName of null, so pass DUMMY_TABLE
AsyncRequestFutureImpl<Res> r = new MyAsyncRequestFutureImpl<Res>(
DUMMY_TABLE, actions, nonceGroup, getPool(pool), needResults,
- results, callback, callable, curTimeout, this);
+ results, callback, callable, operationTimeout, rpcTimeout, this);
allReqs.add(r);
return r;
}
@@ -174,14 +177,16 @@ public class TestAsyncProcess {
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
- new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout);
+ new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout,
+ operationTimeout);
}
public MyAsyncProcess(
ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
- new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout);
+ new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf),
+ rpcTimeout, operationTimeout);
}
public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
@@ -193,7 +198,8 @@ public class TestAsyncProcess {
throw new RejectedExecutionException("test under failure");
}
},
- new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout);
+ new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf),
+ rpcTimeout, operationTimeout);
}
@Override
@@ -213,7 +219,7 @@ public class TestAsyncProcess {
}
@Override
protected RpcRetryingCaller<AbstractResponse> createCaller(
- CancellableRegionServerCallable callable) {
+ CancellableRegionServerCallable callable, int rpcTimeout) {
callsCt.incrementAndGet();
MultiServerCallable callable1 = (MultiServerCallable) callable;
final MultiResponse mr = createMultiResponse(
@@ -254,12 +260,11 @@ public class TestAsyncProcess {
static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
public MyAsyncRequestFutureImpl(TableName tableName, List<Action<Row>> actions, long nonceGroup,
- ExecutorService pool, boolean needResults, Object[] results,
- Batch.Callback callback,
- CancellableRegionServerCallable callable, int timeout,
- AsyncProcess asyncProcess) {
+ ExecutorService pool, boolean needResults, Object[] results,
+ Batch.Callback callback, CancellableRegionServerCallable callable, int operationTimeout,
+ int rpcTimeout, AsyncProcess asyncProcess) {
super(tableName, actions, nonceGroup, pool, needResults,
- results, callback, callable, timeout, asyncProcess);
+ results, callback, callable, operationTimeout, rpcTimeout, asyncProcess);
}
@Override
@@ -299,7 +304,7 @@ public class TestAsyncProcess {
@Override
protected RpcRetryingCaller<AbstractResponse> createCaller(
- CancellableRegionServerCallable callable) {
+ CancellableRegionServerCallable callable, int rpcTimeout) {
callsCt.incrementAndGet();
return new CallerWithFailure(ioe);
}
@@ -351,7 +356,7 @@ public class TestAsyncProcess {
@Override
protected RpcRetryingCaller<AbstractResponse> createCaller(
- CancellableRegionServerCallable payloadCallable) {
+ CancellableRegionServerCallable payloadCallable, int rpcTimeout) {
MultiServerCallable<Row> callable = (MultiServerCallable) payloadCallable;
final MultiResponse mr = createMultiResponse(
callable.getMulti(), nbMultiResponse, nbActions, new ResponseGenerator() {
@@ -1638,12 +1643,14 @@ public class TestAsyncProcess {
}
static class AsyncProcessForThrowableCheck extends AsyncProcess {
- private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-
+ private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ private static int operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+ HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf,
ExecutorService pool) {
super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
- conf), rpcTimeout);
+ conf), rpcTimeout, operationTimeout);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index 658fa96..ee89609 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -128,7 +128,9 @@ public class HConnectionTestingUtility {
Mockito.when(c.getAsyncProcess()).thenReturn(
new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT)));
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT), conf.getInt(
+ HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+ HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT)));
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
RpcRetryingCallerFactory.instantiate(conf,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));
http://git-wip-us.apache.org/repos/asf/hbase/blob/88ff71b9/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index f9ebc47..3416e54 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -135,12 +135,42 @@ public class TestHCM {
@Override
public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
- final Get get, final List<Cell> results) throws IOException {
+ final Get get, final List<Cell> results) throws IOException {
+ Threads.sleep(sleepTime.get());
+ if (ct.incrementAndGet() == 1) {
+ throw new IOException("first call I fail");
+ }
+ }
+
+ @Override
+ public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Put put, final WALEdit edit, final Durability durability) throws IOException {
Threads.sleep(sleepTime.get());
- if (ct.incrementAndGet() == 1){
+ if (ct.incrementAndGet() == 1) {
throw new IOException("first call I fail");
}
}
+
+ @Override
+ public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Delete delete,
+ final WALEdit edit, final Durability durability) throws IOException {
+ Threads.sleep(sleepTime.get());
+ if (ct.incrementAndGet() == 1) {
+ throw new IOException("first call I fail");
+ }
+ }
+
+ @Override
+ public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Increment increment) throws IOException {
+ Threads.sleep(sleepTime.get());
+ if (ct.incrementAndGet() == 1) {
+ throw new IOException("first call I fail");
+ }
+ return super.preIncrement(e, increment);
+ }
+
}
public static class SleepCoprocessor extends BaseRegionObserver {
@@ -156,16 +186,20 @@ public class TestHCM {
final Put put, final WALEdit edit, final Durability durability) throws IOException {
Threads.sleep(SLEEP_TIME);
}
- }
- public static class SleepWriteCoprocessor extends BaseRegionObserver {
- public static final int SLEEP_TIME = 5000;
@Override
public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
final Increment increment) throws IOException {
Threads.sleep(SLEEP_TIME);
return super.preIncrement(e, increment);
}
+
+ @Override
+ public void preDelete(final ObserverContext<RegionCoprocessorEnvironment> e, final Delete delete,
+ final WALEdit edit, final Durability durability) throws IOException {
+ Threads.sleep(SLEEP_TIME);
+ }
+
}
public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver {
@@ -364,11 +398,12 @@ public class TestHCM {
* timeouted when the server answers.
*/
@Test
- public void testOperationTimeout() throws Exception {
- HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testOperationTimeout");
+ public void testGetOperationTimeout() throws Exception {
+ HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetOperationTimeout");
hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
- Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM});
+ Table table = TEST_UTIL.createTable(hdt, new byte[][]{FAM_NAM}, TEST_UTIL.getConfiguration());
table.setRpcTimeout(Integer.MAX_VALUE);
+ SleepAndFailFirstTime.ct.set(0);
// Check that it works if the timeout is big enough
table.setOperationTimeout(120 * 1000);
table.get(new Get(FAM_NAM));
@@ -392,6 +427,64 @@ public class TestHCM {
}
@Test
+ public void testPutOperationTimeout() throws Exception {
+ HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutOperationTimeout");
+ hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
+ Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration());
+ table.setRpcTimeout(Integer.MAX_VALUE);
+ SleepAndFailFirstTime.ct.set(0);
+ // Check that it works if the timeout is big enough
+ table.setOperationTimeout(120 * 1000);
+ table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM));
+
+ // Resetting and retrying. Will fail this time, not enough time for the second try
+ SleepAndFailFirstTime.ct.set(0);
+ try {
+ table.setOperationTimeout(30 * 1000);
+ table.put(new Put(FAM_NAM).addColumn(FAM_NAM, FAM_NAM, FAM_NAM));
+ Assert.fail("We expect an exception here");
+ } catch (RetriesExhaustedWithDetailsException e) {
+ // The client has a CallTimeout class, but it's not shared.We're not very clean today,
+ // in the general case you can expect the call to stop, but the exception may vary.
+ // In this test however, we're sure that it will be a socket timeout.
+ LOG.info("We received an exception, as expected ", e);
+ } catch (IOException e) {
+ Assert.fail("Wrong exception:" + e.getMessage());
+ } finally {
+ table.close();
+ }
+ }
+
+ @Test
+ public void testDeleteOperationTimeout() throws Exception {
+ HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteOperationTimeout");
+ hdt.addCoprocessor(SleepAndFailFirstTime.class.getName());
+ Table table = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM },TEST_UTIL.getConfiguration());
+ table.setRpcTimeout(Integer.MAX_VALUE);
+ SleepAndFailFirstTime.ct.set(0);
+ // Check that it works if the timeout is big enough
+ table.setOperationTimeout(120 * 1000);
+ table.delete(new Delete(FAM_NAM));
+
+ // Resetting and retrying. Will fail this time, not enough time for the second try
+ SleepAndFailFirstTime.ct.set(0);
+ try {
+ table.setOperationTimeout(30 * 1000);
+ table.delete(new Delete(FAM_NAM));
+ Assert.fail("We expect an exception here");
+ } catch (RetriesExhaustedWithDetailsException e) {
+ // The client has a CallTimeout class, but it's not shared.We're not very clean today,
+ // in the general case you can expect the call to stop, but the exception may vary.
+ // In this test however, we're sure that it will be a socket timeout.
+ LOG.info("We received an exception, as expected ", e);
+ } catch (IOException e) {
+ Assert.fail("Wrong exception:" + e.getMessage());
+ } finally {
+ table.close();
+ }
+ }
+
+ @Test
public void testRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
@@ -419,14 +512,14 @@ public class TestHCM {
}
@Test
- public void testWriteRpcTimeout() throws Exception {
- HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout");
- hdt.addCoprocessor(SleepWriteCoprocessor.class.getName());
+ public void testIncrementRpcTimeout() throws Exception {
+ HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testIncrementRpcTimeout");
+ hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
- t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2);
- t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100);
+ t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
+ t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
Increment i = new Increment(FAM_NAM);
i.addColumn(FAM_NAM, FAM_NAM, 1);
t.increment(i);
@@ -436,7 +529,7 @@ public class TestHCM {
}
// Again, with configuration based override
- c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepWriteCoprocessor.SLEEP_TIME / 2);
+ c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2);
try (Connection conn = ConnectionFactory.createConnection(c)) {
try (Table t = conn.getTable(hdt.getTableName())) {
Increment i = new Increment(FAM_NAM);
@@ -450,8 +543,46 @@ public class TestHCM {
}
@Test
- public void testReadRpcTimeout() throws Exception {
- HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout");
+ public void testDeleteRpcTimeout() throws Exception {
+ HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDeleteRpcTimeout");
+ hdt.addCoprocessor(SleepCoprocessor.class.getName());
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+ try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
+ t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
+ t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
+ Delete d = new Delete(FAM_NAM);
+ d.addColumn(FAM_NAM, FAM_NAM, 1);
+ t.delete(d);
+ fail("Write should not have succeeded");
+ } catch (RetriesExhaustedException e) {
+ // expected
+ }
+
+ }
+
+ @Test
+ public void testPutRpcTimeout() throws Exception {
+ HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testPutRpcTimeout");
+ hdt.addCoprocessor(SleepCoprocessor.class.getName());
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+ try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
+ t.setWriteRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
+ t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
+ Put p = new Put(FAM_NAM);
+ p.addColumn(FAM_NAM, FAM_NAM, FAM_NAM);
+ t.put(p);
+ fail("Write should not have succeeded");
+ } catch (RetriesExhaustedException e) {
+ // expected
+ }
+
+ }
+
+ @Test
+ public void testGetRpcTimeout() throws Exception {
+ HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testGetRpcTimeout");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
@@ -502,6 +633,7 @@ public class TestHCM {
TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }).close();
Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+ SleepAndFailFirstTime.ct.set(0);
c.setInt(HConstants.HBASE_CLIENT_PAUSE, 3000);
c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 4000);
@@ -1013,8 +1145,7 @@ public class TestHCM {
curServer.getServerName().getPort(),
conn.getCachedLocation(TABLE_NAME, ROW).getRegionLocation().getPort());
- TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+ TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
table.close();
connection.close();
}