You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/09/04 00:15:19 UTC

hbase git commit: HBASE-16556 The read/write timeout are not used in HTable.delete(List), HTable.get(List), and HTable.existsAll(List) (ChiaPing Tsai)

Repository: hbase
Updated Branches:
  refs/heads/master 520c3cc4e -> 592245ff1


HBASE-16556 The read/write timeout are not used in HTable.delete(List), HTable.get(List), and HTable.existsAll(List) (ChiaPing Tsai)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/592245ff
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/592245ff
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/592245ff

Branch: refs/heads/master
Commit: 592245ff1307f72161aa64774aa891248bb968de
Parents: 520c3cc
Author: tedyu <yu...@gmail.com>
Authored: Sat Sep 3 17:14:57 2016 -0700
Committer: tedyu <yu...@gmail.com>
Committed: Sat Sep 3 17:14:57 2016 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/HTable.java  | 32 ++++++------
 .../hadoop/hbase/client/TestAsyncProcess.java   | 52 ++++++++++++++++++--
 .../RegionCoprocessorEnvironment.java           |  2 +-
 3 files changed, 68 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/592245ff/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index d2423b3..e6f6f46 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -196,6 +196,12 @@ public class HTable implements Table {
     cleanupConnectionOnClose = false;
     // used from tests, don't trust the connection is real
     this.mutator = new BufferedMutatorImpl(conn, null, null, params);
+    this.readRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+        conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+    this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+        conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
   }
 
   /**
@@ -453,7 +459,7 @@ public class HTable implements Table {
     }
     try {
       Object[] r1 = new Object[gets.size()];
-      batch((List<? extends Row>)gets, r1);
+      batch((List<? extends Row>)gets, r1, readRpcTimeout);
       // Translate.
       Result [] results = new Result[r1.length];
       int i = 0;
@@ -480,6 +486,15 @@ public class HTable implements Table {
     }
   }
 
+  public void batch(final List<? extends Row> actions, final Object[] results, int timeout)
+      throws InterruptedException, IOException {
+    AsyncRequestFuture ars = multiAp.submitAll(pool, tableName, actions, null, results, null, timeout);
+    ars.waitUntilDone();
+    if (ars.hasError()) {
+      throw ars.getErrors();
+    }
+  }
+
   /**
    * {@inheritDoc}
    */
@@ -529,7 +544,7 @@ public class HTable implements Table {
   throws IOException {
     Object[] results = new Object[deletes.size()];
     try {
-      batch(deletes, results);
+      batch(deletes, results, writeRpcTimeout);
     } catch (InterruptedException e) {
       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
     } finally {
@@ -866,7 +881,7 @@ public class HTable implements Table {
 
     Object[] r1= new Object[exists.size()];
     try {
-      batch(exists, r1);
+      batch(exists, r1, readRpcTimeout);
     } catch (InterruptedException e) {
       throw (InterruptedIOException)new InterruptedIOException().initCause(e);
     }
@@ -910,17 +925,6 @@ public class HTable implements Table {
     this.batchCallback(list, results, callback);
   }
 
-
-  /**
-   * Parameterized batch processing, allowing varying return types for different
-   * {@link Row} implementations.
-   */
-  public void processBatch(final List<? extends Row> list, final Object[] results)
-    throws IOException, InterruptedException {
-    this.batch(list, results);
-  }
-
-
   @Override
   public void close() throws IOException {
     if (this.closed) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/592245ff/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index bcc052d..656dcfc 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -155,7 +155,7 @@ public class TestAsyncProcess {
     public List<AsyncRequestFuture> allReqs = new ArrayList<AsyncRequestFuture>();
     public AtomicInteger callsCt = new AtomicInteger();
     private static int rpcTimeout = conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
-
+    private long previousTimeout = -1;
     @Override
     protected <Res> AsyncRequestFutureImpl<Res> createAsyncRequestFuture(TableName tableName,
         List<Action<Row>> actions, long nonceGroup, ExecutorService pool,
@@ -210,7 +210,13 @@ public class TestAsyncProcess {
       // We use results in tests to check things, so override to always save them.
       return super.submit(DUMMY_TABLE, rows, atLeastOne, callback, true);
     }
-
+    @Override
+    public <CResult> AsyncRequestFuture submitAll(ExecutorService pool, TableName tableName,
+      List<? extends Row> rows, Batch.Callback<CResult> callback, Object[] results,
+      CancellableRegionServerCallable callable, int curTimeout) {
+      previousTimeout = curTimeout;
+      return super.submitAll(pool, tableName, rows, callback, results, callable, curTimeout);
+    }
 
     @Override
     protected void updateStats(ServerName server, Map<byte[], MultiResponse.RegionResult> results) {
@@ -1273,7 +1279,7 @@ public class TestAsyncProcess {
 
     Object[] res = new Object[puts.size()];
     try {
-      ht.processBatch(puts, res);
+      ht.batch(puts, res);
       Assert.fail();
     } catch (RetriesExhaustedException expected) {
     }
@@ -1314,6 +1320,46 @@ public class TestAsyncProcess {
   }
 
   @Test
+  public void testReadAndWriteTimeout() throws IOException {
+    final long readTimeout = 10 * 1000;
+    final long writeTimeout = 20 * 1000;
+    Configuration copyConf = new Configuration(conf);
+    copyConf.setLong(HConstants.HBASE_RPC_READ_TIMEOUT_KEY, readTimeout);
+    copyConf.setLong(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY, writeTimeout);
+    ClusterConnection conn = createHConnection();
+    Mockito.when(conn.getConfiguration()).thenReturn(copyConf);
+    BufferedMutatorParams bufferParam = new BufferedMutatorParams(DUMMY_TABLE);
+    try (HTable ht = new HTable(conn, bufferParam)) {
+      MyAsyncProcess ap = new MyAsyncProcess(conn, copyConf, true);
+      ht.multiAp = ap;
+      List<Get> gets = new LinkedList<>();
+      gets.add(new Get(DUMMY_BYTES_1));
+      gets.add(new Get(DUMMY_BYTES_2));
+      try {
+        ht.get(gets);
+      } catch (ClassCastException e) {
+        // No result response on this test.
+      }
+      assertEquals(readTimeout, ap.previousTimeout);
+      ap.previousTimeout = -1;
+
+      try {
+        ht.existsAll(gets);
+      } catch (ClassCastException e) {
+        // No result response on this test.
+      }
+      assertEquals(readTimeout, ap.previousTimeout);
+      ap.previousTimeout = -1;
+
+      List<Delete> deletes = new LinkedList<>();
+      deletes.add(new Delete(DUMMY_BYTES_1));
+      deletes.add(new Delete(DUMMY_BYTES_2));
+      ht.delete(deletes);
+      assertEquals(writeTimeout, ap.previousTimeout);
+    }
+  }
+
+  @Test
   public void testGlobalErrors() throws IOException {
     ClusterConnection conn = new MyConnectionImpl(conf);
     BufferedMutatorImpl mutator = (BufferedMutatorImpl) conn.getBufferedMutator(DUMMY_TABLE);

http://git-wip-us.apache.org/repos/asf/hbase/blob/592245ff/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
index a577748..bdf88af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionCoprocessorEnvironment.java
@@ -42,5 +42,5 @@ public interface RegionCoprocessorEnvironment extends CoprocessorEnvironment {
   RegionServerServices getRegionServerServices();
 
   /** @return shared data between all instances of this coprocessor */
-  ConcurrentMap<String, Object> getSharedData(); 
+  ConcurrentMap<String, Object> getSharedData();
 }