You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/09/04 00:15:19 UTC
hbase git commit: HBASE-16556 The read/write timeout are not used in
HTable.delete(List), HTable.get(List),
and HTable.existsAll(List) (ChiaPing Tsai)
Repository: hbase
Updated Branches:
refs/heads/master 520c3cc4e -> 592245ff1
HBASE-16556 The read/write timeout are not used in HTable.delete(List), HTable.get(List), and HTable.existsAll(List) (ChiaPing Tsai)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/592245ff
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/592245ff
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/592245ff
Branch: refs/heads/master
Commit: 592245ff1307f72161aa64774aa891248bb968de
Parents: 520c3cc
Author: tedyu <yu...@gmail.com>
Authored: Sat Sep 3 17:14:57 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Sat Sep 3 17:14:57 2016 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/HTable.java | 32 ++++++------
.../hadoop/hbase/client/TestAsyncProcess.java | 52 ++++++++++++++++++--
.../RegionCoprocessorEnvironment.java | 2 +-
3 files changed, 68 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/592245ff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index d2423b3..e6f6f46 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -196,6 +196,12 @@ public class HTable implements Table {
cleanupConnectionOnClose = false;
// used from tests, don't trust the connection is real
this.mutator = new BufferedMutatorImpl(conn, null, null, params);
+ this.readRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+ conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+ this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+ conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
}
/**
@@ -453,7 +459,7 @@ public class HTable implements Table {
}
try {
Object[] r1 = new Object[gets.size()];
- batch((List<? extends Row>)gets, r1);
+ batch((List<? extends Row>)gets, r1, readRpcTimeout);
// Translate.
Result [] results = new Result[r1.length];
int i = 0;
@@ -480,6 +486,15 @@ public class HTable implements Table {
}
}
+ public void batch(final List<? extends Row> actions, final Object[] results, int timeout)
+ throws InterruptedException, IOException {
+ AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null, timeout);
+ ars.waitUntilDone();
+ if (ars.hasError()) {
+ throw ars.getErrors();
+ }
+ }
+
/**
* {@inheritDoc}
*/
@@ -529,7 +544,7 @@ public class HTable implements Table {
throws IOException {
Object[] results = new Object[deletes.size()];
try {
- batch(deletes, results);
+ batch(deletes, results, writeRpcTimeout);
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
} finally {
@@ -866,7 +881,7 @@ public class HTable implements Table {
Object[] r1= new Object[exists.size()];
try {
- batch(exists, r1);
+ batch(exists, r1, readRpcTimeout);
} catch (InterruptedException e) {
throw (InterruptedIOException)new InterruptedIOException().initCause(e);
}
@@ -910,17 +925,6 @@ public class HTable implements Table {
this.batchCallback(list, results, callback);
}
-
- /**
- * Parameterized batch processing, allowing varying return types for different
- * {@link Row} implementations.
- */
- public void processBatch(final List<? extends Row> list, final Object[] results)
- throws IOException, InterruptedException {
- this.batch(list, results);
- }
-
-
@Override
public void close() throws IOException {
if (this.closed) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/592245ff/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index bcc052d..656dcfc 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -155,7 +155,7 @@ public class TestAsyncProcess {
public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
public AtomicInteger callsCt = new AtomicInteger();
private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-
+ private long previousTimeout = -1;
@Override
protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
@@ -210,7 +210,13 @@ public class TestAsyncProcess {
// We use results in tests to check things, so override to always save them.
return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
}
-
+ @Override
+ public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
+ List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
+ CancellableRegionServerCallable callable, int curTimeout) {
+ previousTimeout = curTimeout;
+ return super.submitAll(pool, tableName, rows, callback, results, callable, curTimeout);
+ }
@Override
protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
@@ -1273,7 +1279,7 @@ public class TestAsyncProcess {
Object[] res = new Object[puts.size()];
try {
- ht.processBatch(puts, res);
+ ht.batch(puts, res);
Assert.fail();
} catch (RetriesExhaustedException expected) {
}
@@ -1314,6 +1320,46 @@ public class TestAsyncProcess {
}
@Test
+ public void testReadAndWriteTimeout() throws IOException {
+ final long readTimeout = 10 * 1000;
+ final long writeTimeout = 20 * 1000;
+ Configuration copyConf = new Configuration(conf);
+ copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
+ copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
+ ClusterConnection conn = createHConnection();
+ Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
+ BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE);
+ try (HTable ht = new HTable(conn, bufferParam)) {
+ MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true);
+ ht.multiAp = ap;
+ List<Get> gets = new LinkedList<>();
+ gets.add(new Get(DUMMY_BYTES_1));
+ gets.add(new Get(DUMMY_BYTES_2));
+ try {
+ ht.get(gets);
+ } catch (ClassCastException e) {
+ // No result response on this test.
+ }
+ assertEquals(readTimeout, ap.previousTimeout);
+ ap.previousTimeout = -1;
+
+ try {
+ ht.existsAll(gets);
+ } catch (ClassCastException e) {
+ // No result response on this test.
+ }
+ assertEquals(readTimeout, ap.previousTimeout);
+ ap.previousTimeout = -1;
+
+ List<Delete> deletes = new LinkedList<>();
+ deletes.add(new Delete(DUMMY_BYTES_1));
+ deletes.add(new Delete(DUMMY_BYTES_2));
+ ht.delete(deletes);
+ assertEquals(writeTimeout, ap.previousTimeout);
+ }
+ }
+
+ @Test
public void testGlobalErrors() throws IOException {
ClusterConnection conn = new MyConnectionImpl(conf);
BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);
http://git-wip-us.apache.org/repos/asf/hbase/blob/592245ff/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
index a577748..bdf88af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
@@ -42,5 +42,5 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment {
RegionServerServices getRegionServerServices();
/** @return shared data between all instances of this coprocessor */
- ConcurrentMap<String, Object> getSharedData();
+ ConcurrentMap<String, Object> getSharedData();
}