You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/06/02 15:21:18 UTC

[hbase] 28/28: HBASE-22526 RejectedExecutionException could be thrown from TableOverAsyncTable.coprocessor service if the connection has been shutown

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 77c828760c293c40acf2b5653845654bd9291541
Author: zhangduo <zh...@apache.org>
AuthorDate: Sun Jun 2 21:54:29 2019 +0800

    HBASE-22526 RejectedExecutionException could be thrown from TableOverAsyncTable.coprocessor service if the connection has been shutown
---
 .../hadoop/hbase/client/TableOverAsyncTable.java   | 38 ++++++++++++++--------
 1 file changed, 25 insertions(+), 13 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
index 30e3062..5686b09 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
@@ -38,11 +38,14 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 import org.apache.commons.lang3.ArrayUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.TableName;
@@ -422,20 +425,29 @@ class TableOverAsyncTable implements Table {
     // get regions covered by the row range
     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
     Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
-    for (byte[] r : keys) {
-      RegionCoprocessorRpcChannel channel = coprocessorService(r);
-      Future<R> future = pool.submit(new Callable<R>() {
-        @Override
-        public R call() throws Exception {
-          R result = call.call(channel);
-          byte[] region = channel.getLastRegion();
-          if (callback != null) {
-            callback.update(region, r, result);
+    try {
+      for (byte[] r : keys) {
+        RegionCoprocessorRpcChannel channel = coprocessorService(r);
+        Future<R> future = pool.submit(new Callable<R>() {
+          @Override
+          public R call() throws Exception {
+            R result = call.call(channel);
+            byte[] region = channel.getLastRegion();
+            if (callback != null) {
+              callback.update(region, r, result);
+            }
+            return result;
           }
-          return result;
-        }
-      });
-      futures.put(r, future);
+        });
+        futures.put(r, future);
+      }
+    } catch (RejectedExecutionException e) {
+      // maybe the connection has been closed, let's check
+      if (pool.isShutdown()) {
+        throw new DoNotRetryIOException("Connection is closed", e);
+      } else {
+        throw new HBaseIOException("Coprocessor operation is rejected", e);
+      }
     }
     for (Map.Entry<byte[], Future<R>> e : futures.entrySet()) {
       try {