You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by sy...@apache.org on 2016/08/17 18:34:31 UTC
[25/50] [abbrv] hbase git commit: HBASE-15866 Split hbase.rpc.timeout
into *.read.timeout and *.write.timeout
HBASE-15866 Split hbase.rpc.timeout into *.read.timeout and *.write.timeout
Signed-off-by: Andrew Purtell <ap...@apache.org>
Amending-Author: Andrew Purtell <ap...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/30d7eeae
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/30d7eeae
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/30d7eeae
Branch: refs/heads/hbase-12439
Commit: 30d7eeaefe431bc263519064662e6dc8ad8ff05a
Parents: 4e08a8b
Author: Vivek <vk...@salesforce.com>
Authored: Fri Aug 5 17:25:06 2016 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Sat Aug 6 16:55:09 2016 -0700
----------------------------------------------------------------------
.../hadoop/hbase/client/AsyncProcess.java | 5 +-
.../hbase/client/BufferedMutatorImpl.java | 8 +-
.../hbase/client/ConnectionImplementation.java | 3 +-
.../org/apache/hadoop/hbase/client/HTable.java | 58 +++++++++----
.../hadoop/hbase/client/HTableMultiplexer.java | 6 +-
.../org/apache/hadoop/hbase/client/Table.java | 43 +++++++++-
.../hadoop/hbase/client/TestAsyncProcess.java | 11 ++-
.../org/apache/hadoop/hbase/HConstants.java | 13 +++
.../hadoop/hbase/rest/client/RemoteHTable.java | 22 +++++
.../hadoop/hbase/client/HTableWrapper.java | 14 +++
.../hbase/client/HConnectionTestingUtility.java | 6 +-
.../org/apache/hadoop/hbase/client/TestHCM.java | 90 ++++++++++++++++++--
.../hbase/regionserver/RegionAsTable.java | 14 +++
13 files changed, 257 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 4514560..1383ca0 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -281,7 +281,7 @@ class AsyncProcess {
public AsyncProcess(ClusterConnection hc, Configuration conf, ExecutorService pool,
RpcRetryingCallerFactory rpcCaller, boolean useGlobalErrors,
- RpcControllerFactory rpcFactory) {
+ RpcControllerFactory rpcFactory, int rpcTimeout) {
if (hc == null) {
throw new IllegalArgumentException("ClusterConnection cannot be null.");
}
@@ -297,8 +297,7 @@ class AsyncProcess {
// how many times we could try in total, one more than retry number
this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER) + 1;
- this.timeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ this.timeout = rpcTimeout;
this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index e98ad4e..39e4f75 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -19,6 +19,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -72,6 +73,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
private final int maxKeyValueSize;
private boolean closed = false;
private final ExecutorService pool;
+ private int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
@VisibleForTesting
protected AsyncProcess ap; // non-final so can be overridden in test
@@ -94,8 +96,12 @@ public class BufferedMutatorImpl implements BufferedMutator {
this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
+ this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+ conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+
// puts need to track errors globally due to how the APIs currently work.
- ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory);
+ ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index 8dcda13..04edd25 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -1823,7 +1823,8 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
// For tests to override.
protected AsyncProcess createAsyncProcess(Configuration conf) {
// No default pool available.
- return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory);
+ int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory, rpcTimeout);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index fbd9f51..882e21b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -112,7 +112,8 @@ public class HTable implements Table {
protected long scannerMaxResultSize;
private ExecutorService pool; // For Multi & Scan
private int operationTimeout; // global timeout for each blocking method with retrying rpc
- private int rpcTimeout; // timeout for each rpc request
+ private int readRpcTimeout; // timeout for each read rpc request
+ private int writeRpcTimeout; // timeout for each write rpc request
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final boolean cleanupConnectionOnClose; // close the connection in close()
private Consistency defaultConsistency = Consistency.STRONG;
@@ -212,8 +213,12 @@ public class HTable implements Table {
this.operationTimeout = tableName.isSystemTable() ?
connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
- this.rpcTimeout = configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+ configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+ this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+ configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
this.scannerCaching = connConfiguration.getScannerCaching();
this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
if (this.rpcCallerFactory == null) {
@@ -257,7 +262,7 @@ public class HTable implements Table {
@Override
public HTableDescriptor getTableDescriptor() throws IOException {
HTableDescriptor htd = HBaseAdmin.getTableDescriptor(tableName, connection, rpcCallerFactory,
- rpcControllerFactory, operationTimeout, rpcTimeout);
+ rpcControllerFactory, operationTimeout, readRpcTimeout);
if (htd != null) {
return new UnmodifyableHTableDescriptor(htd);
}
@@ -430,7 +435,7 @@ public class HTable implements Table {
}
}
};
- return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
+ return rpcCallerFactory.<Result>newCaller(readRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@@ -528,7 +533,7 @@ public class HTable implements Table {
}
}
};
- rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
+ rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@@ -654,7 +659,7 @@ public class HTable implements Table {
}
}
};
- return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
+ return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@@ -686,7 +691,7 @@ public class HTable implements Table {
}
}
};
- return rpcCallerFactory.<Result> newCaller(rpcTimeout).callWithRetries(callable,
+ return rpcCallerFactory.<Result> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@@ -742,7 +747,7 @@ public class HTable implements Table {
}
}
};
- return rpcCallerFactory.<Long> newCaller(rpcTimeout).callWithRetries(callable,
+ return rpcCallerFactory.<Long> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@@ -772,7 +777,7 @@ public class HTable implements Table {
}
}
};
- return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
+ return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@@ -803,7 +808,7 @@ public class HTable implements Table {
}
}
};
- return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
+ return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@@ -833,7 +838,7 @@ public class HTable implements Table {
}
}
};
- return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
+ return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@@ -864,7 +869,7 @@ public class HTable implements Table {
}
}
};
- return rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
+ return rpcCallerFactory.<Boolean> newCaller(writeRpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
@@ -1196,13 +1201,34 @@ public class HTable implements Table {
}
@Override
+ @Deprecated
public int getRpcTimeout() {
- return rpcTimeout;
+ return readRpcTimeout;
}
@Override
+ @Deprecated
public void setRpcTimeout(int rpcTimeout) {
- this.rpcTimeout = rpcTimeout;
+ this.readRpcTimeout = rpcTimeout;
+ this.writeRpcTimeout = rpcTimeout;
+ }
+
+ @Override
+ public int getWriteRpcTimeout() {
+ return writeRpcTimeout;
+ }
+
+ @Override
+ public void setWriteRpcTimeout(int writeRpcTimeout) {
+ this.writeRpcTimeout = writeRpcTimeout;
+ }
+
+ @Override
+ public int getReadRpcTimeout() { return readRpcTimeout; }
+
+ @Override
+ public void setReadRpcTimeout(int readRpcTimeout) {
+ this.readRpcTimeout = readRpcTimeout;
}
@Override
@@ -1282,7 +1308,7 @@ public class HTable implements Table {
AsyncProcess asyncProcess =
new AsyncProcess(connection, configuration, pool,
RpcRetryingCallerFactory.instantiate(configuration, connection.getStatisticsTracker()),
- true, RpcControllerFactory.instantiate(configuration));
+ true, RpcControllerFactory.instantiate(configuration), readRpcTimeout);
AsyncRequestFuture future = asyncProcess.submitAll(tableName, execs,
new Callback<ClientProtos.CoprocessorServiceResult>() {
http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
index f1bbcb3..ba963c2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTableMultiplexer.java
@@ -442,6 +442,7 @@ public class HTableMultiplexer {
private final ScheduledExecutorService executor;
private final int maxRetryInQueue;
private final AtomicInteger retryInQueue = new AtomicInteger(0);
+ private final int writeRpcTimeout; // needed to pass in through AsyncProcess constructor
public FlushWorker(Configuration conf, ClusterConnection conn, HRegionLocation addr,
HTableMultiplexer htableMultiplexer, int perRegionServerBufferQueueSize,
@@ -451,7 +452,10 @@ public class HTableMultiplexer {
this.queue = new LinkedBlockingQueue<>(perRegionServerBufferQueueSize);
RpcRetryingCallerFactory rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf);
RpcControllerFactory rpcControllerFactory = RpcControllerFactory.instantiate(conf);
- this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory);
+ this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+ conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+ this.ap = new AsyncProcess(conn, conf, pool, rpcCallerFactory, false, rpcControllerFactory, writeRpcTimeout);
this.executor = executor;
this.maxRetryInQueue = conf.getInt(TABLE_MULTIPLEXER_MAX_RETRIES_IN_QUEUE, 10000);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
index f2cec97..4d93442 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Table.java
@@ -585,17 +585,56 @@ public interface Table extends Closeable {
int getOperationTimeout();
/**
+ * Get timeout (millisecond) of each rpc request in this Table instance.
+ *
+ * @returns Currently configured read timeout
+ * @deprecated Use getReadRpcTimeout or getWriteRpcTimeout instead
+ */
+ @Deprecated
+ int getRpcTimeout();
+
+ /**
* Set timeout (millisecond) of each rpc request in operations of this Table instance, will
* override the value of hbase.rpc.timeout in configuration.
* If a rpc request waiting too long, it will stop waiting and send a new request to retry until
* retries exhausted or operation timeout reached.
+ * <p>
+ * NOTE: This will set both the read and write timeout settings to the provided value.
+ *
* @param rpcTimeout the timeout of each rpc request in millisecond.
+ *
+ * @deprecated Use setReadRpcTimeout or setWriteRpcTimeout instead
*/
+ @Deprecated
void setRpcTimeout(int rpcTimeout);
/**
- * Get timeout (millisecond) of each rpc request in this Table instance.
+ * Get timeout (millisecond) of each rpc read request in this Table instance.
*/
- int getRpcTimeout();
+ int getReadRpcTimeout();
+
+ /**
+ * Set timeout (millisecond) of each rpc read request in operations of this Table instance, will
+ * override the value of hbase.rpc.read.timeout in configuration.
+ * If a rpc read request waiting too long, it will stop waiting and send a new request to retry
+ * until retries exhausted or operation timeout reached.
+ *
+ * @param readRpcTimeout
+ */
+ void setReadRpcTimeout(int readRpcTimeout);
+ /**
+ * Get timeout (millisecond) of each rpc write request in this Table instance.
+ */
+ int getWriteRpcTimeout();
+
+ /**
+ * Set timeout (millisecond) of each rpc write request in operations of this Table instance, will
+ * override the value of hbase.rpc.write.timeout in configuration.
+ * If a rpc write request waiting too long, it will stop waiting and send a new request to retry
+ * until retries exhausted or operation timeout reached.
+ *
+ * @param writeRpcTimeout
+ */
+ void setWriteRpcTimeout(int writeRpcTimeout);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index d943316..0aa9704 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -138,6 +138,7 @@ public class TestAsyncProcess {
final AtomicInteger nbActions = new AtomicInteger();
public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
public AtomicInteger callsCt = new AtomicInteger();
+ private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
@Override
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
@@ -157,14 +158,14 @@ public class TestAsyncProcess {
public MyAsyncProcess(ClusterConnection hc, Configuration conf, AtomicInteger nbThreads) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(nbThreads)),
- new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf));
+ new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(conf), rpcTimeout);
}
public MyAsyncProcess(
ClusterConnection hc, Configuration conf, boolean useGlobalErrors) {
super(hc, conf, new ThreadPoolExecutor(1, 20, 60, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(), new CountingThreadFactory(new AtomicInteger())),
- new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
+ new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout);
}
public MyAsyncProcess(ClusterConnection hc, Configuration conf, boolean useGlobalErrors,
@@ -176,7 +177,7 @@ public class TestAsyncProcess {
throw new RejectedExecutionException("test under failure");
}
},
- new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf));
+ new RpcRetryingCallerFactory(conf), useGlobalErrors, new RpcControllerFactory(conf), rpcTimeout);
}
@Override
@@ -1111,10 +1112,12 @@ public class TestAsyncProcess {
}
static class AsyncProcessForThrowableCheck extends AsyncProcess {
+ private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+
public AsyncProcessForThrowableCheck(ClusterConnection hc, Configuration conf,
ExecutorService pool) {
super(hc, conf, pool, new RpcRetryingCallerFactory(conf), false, new RpcControllerFactory(
- conf));
+ conf), rpcTimeout);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 256c374..ce18ef5 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -815,10 +815,23 @@ public final class HConstants {
/**
* timeout for each RPC
+ * @deprecated Use {@link #HBASE_RPC_READ_TIMEOUT_KEY} or {@link #HBASE_RPC_WRITE_TIMEOUT_KEY}
+ * instead.
*/
+ @Deprecated
public static final String HBASE_RPC_TIMEOUT_KEY = "hbase.rpc.timeout";
/**
+ * timeout for each read RPC
+ */
+ public static final String HBASE_RPC_READ_TIMEOUT_KEY = "hbase.rpc.read.timeout";
+
+ /**
+ * timeout for each write RPC
+ */
+ public static final String HBASE_RPC_WRITE_TIMEOUT_KEY = "hbase.rpc.write.timeout";
+
+ /**
* Default value of {@link #HBASE_RPC_TIMEOUT_KEY}
*/
public static final int DEFAULT_HBASE_RPC_TIMEOUT = 60000;
http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
----------------------------------------------------------------------
diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
index b9e393e..33c2fc2 100644
--- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
+++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/client/RemoteHTable.java
@@ -860,12 +860,34 @@ public class RemoteHTable implements Table {
}
@Override
+ @Deprecated
public void setRpcTimeout(int rpcTimeout) {
throw new UnsupportedOperationException();
}
@Override
+ @Deprecated
public int getRpcTimeout() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public int getReadRpcTimeout() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setReadRpcTimeout(int readRpcTimeout) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getWriteRpcTimeout() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void setWriteRpcTimeout(int writeRpcTimeout) {
+ throw new UnsupportedOperationException();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
index 5da0df7..6a73261 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/HTableWrapper.java
@@ -308,12 +308,26 @@ public final class HTableWrapper implements Table {
}
@Override
+ @Deprecated
public void setRpcTimeout(int rpcTimeout) {
table.setRpcTimeout(rpcTimeout);
}
@Override
+ public void setWriteRpcTimeout(int writeRpcTimeout) { table.setWriteRpcTimeout(writeRpcTimeout); }
+
+ @Override
+ public void setReadRpcTimeout(int readRpcTimeout) { table.setReadRpcTimeout(readRpcTimeout); }
+
+ @Override
+ @Deprecated
public int getRpcTimeout() {
return table.getRpcTimeout();
}
+
+ @Override
+ public int getWriteRpcTimeout() { return table.getWriteRpcTimeout(); }
+
+ @Override
+ public int getReadRpcTimeout() { return table.getReadRpcTimeout(); }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
index 265e3c1..036b38f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/HConnectionTestingUtility.java
@@ -23,6 +23,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.ServerName;
@@ -126,7 +127,8 @@ public class HConnectionTestingUtility {
Mockito.when(c.getNonceGenerator()).thenReturn(ng);
Mockito.when(c.getAsyncProcess()).thenReturn(
new AsyncProcess(c, conf, null, RpcRetryingCallerFactory.instantiate(conf), false,
- RpcControllerFactory.instantiate(conf)));
+ RpcControllerFactory.instantiate(conf), conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT)));
Mockito.when(c.getNewRpcRetryingCallerFactory(conf)).thenReturn(
RpcRetryingCallerFactory.instantiate(conf,
RetryingCallerInterceptorFactory.NO_OP_INTERCEPTOR, null));
@@ -194,4 +196,4 @@ public class HConnectionTestingUtility {
return result;
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 1b20b76..4d47bde 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -18,11 +18,7 @@
*/
package org.apache.hadoop.hbase.client;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.*;
import java.io.IOException;
import java.lang.reflect.Field;
@@ -149,6 +145,16 @@ public class TestHCM {
}
}
+ public static class SleepWriteCoprocessor extends BaseRegionObserver {
+ public static final int SLEEP_TIME = 5000;
+ @Override
+ public Result preIncrement(final ObserverContext<RegionCoprocessorEnvironment> e,
+ final Increment increment) throws IOException {
+ Threads.sleep(SLEEP_TIME);
+ return super.preIncrement(e, increment);
+ }
+ }
+
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
@@ -351,7 +357,7 @@ public class TestHCM {
}
}
- @Test(expected = RetriesExhaustedException.class)
+ @Test
public void testRpcTimeout() throws Exception {
HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testRpcTimeout");
hdt.addCoprocessor(SleepCoprocessor.class.getName());
@@ -361,6 +367,78 @@ public class TestHCM {
t.setRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
t.get(new Get(FAM_NAM));
+ fail("Get should not have succeeded");
+ } catch (RetriesExhaustedException e) {
+ // expected
+ }
+
+ // Again, with configuration based override
+ c.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2);
+ try (Connection conn = ConnectionFactory.createConnection(c)) {
+ try (Table t = conn.getTable(hdt.getTableName())) {
+ t.get(new Get(FAM_NAM));
+ fail("Get should not have succeeded");
+ } catch (RetriesExhaustedException e) {
+ // expected
+ }
+ }
+ }
+
+ @Test
+ public void testWriteRpcTimeout() throws Exception {
+ HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testWriteRpcTimeout");
+ hdt.addCoprocessor(SleepWriteCoprocessor.class.getName());
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+ try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
+ t.setWriteRpcTimeout(SleepWriteCoprocessor.SLEEP_TIME / 2);
+ t.setOperationTimeout(SleepWriteCoprocessor.SLEEP_TIME * 100);
+ Increment i = new Increment(FAM_NAM);
+ i.addColumn(FAM_NAM, FAM_NAM, 1);
+ t.increment(i);
+ fail("Write should not have succeeded");
+ } catch (RetriesExhaustedException e) {
+ // expected
+ }
+
+ // Again, with configuration based override
+ c.setInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, SleepWriteCoprocessor.SLEEP_TIME / 2);
+ try (Connection conn = ConnectionFactory.createConnection(c)) {
+ try (Table t = conn.getTable(hdt.getTableName())) {
+ Increment i = new Increment(FAM_NAM);
+ i.addColumn(FAM_NAM, FAM_NAM, 1);
+ t.increment(i);
+ fail("Write should not have succeeded");
+ } catch (RetriesExhaustedException e) {
+ // expected
+ }
+ }
+ }
+
+ @Test
+ public void testReadRpcTimeout() throws Exception {
+ HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testReadRpcTimeout");
+ hdt.addCoprocessor(SleepCoprocessor.class.getName());
+ Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+
+ try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
+ t.setReadRpcTimeout(SleepCoprocessor.SLEEP_TIME / 2);
+ t.setOperationTimeout(SleepCoprocessor.SLEEP_TIME * 100);
+ t.get(new Get(FAM_NAM));
+ fail("Get should not have succeeded");
+ } catch (RetriesExhaustedException e) {
+ // expected
+ }
+
+ // Again, with configuration based override
+ c.setInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, SleepCoprocessor.SLEEP_TIME / 2);
+ try (Connection conn = ConnectionFactory.createConnection(c)) {
+ try (Table t = conn.getTable(hdt.getTableName())) {
+ t.get(new Get(FAM_NAM));
+ fail("Get should not have succeeded");
+ } catch (RetriesExhaustedException e) {
+ // expected
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/30d7eeae/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
index 770c39b..d2e78b7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/RegionAsTable.java
@@ -333,12 +333,26 @@ public class RegionAsTable implements Table {
}
@Override
+ @Deprecated
public void setRpcTimeout(int rpcTimeout) {
throw new UnsupportedOperationException();
}
@Override
+ public void setWriteRpcTimeout(int writeRpcTimeout) {throw new UnsupportedOperationException(); }
+
+ @Override
+ public void setReadRpcTimeout(int readRpcTimeout) {throw new UnsupportedOperationException(); }
+
+ @Override
+ @Deprecated
public int getRpcTimeout() {
throw new UnsupportedOperationException();
}
+
+ @Override
+ public int getWriteRpcTimeout() { throw new UnsupportedOperationException(); }
+
+ @Override
+ public int getReadRpcTimeout() { throw new UnsupportedOperationException(); }
}
\ No newline at end of file