You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/06/02 15:21:18 UTC
[hbase] 28/28: HBASE-22526 RejectedExecutionException could be
thrown from TableOverAsyncTable.coprocessor service if the connection has
been shutown
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 77c828760c293c40acf2b5653845654bd9291541
Author: zhangduo <zh...@apache.org>
AuthorDate: Sun Jun 2 21:54:29 2019 +0800
HBASE-22526 RejectedExecutionException could be thrown from TableOverAsyncTable.coprocessor service if the connection has been shutown
---
.../hadoop/hbase/client/TableOverAsyncTable.java | 38 ++++++++++++++--------
1 file changed, 25 insertions(+), 13 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
index 30e3062..5686b09 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
@@ -38,11 +38,14 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
@@ -422,20 +425,29 @@ class TableOverAsyncTable implements Table {
// get regions covered by the row range
List<byte[]> keys = getStartKeysInRange(startKey, endKey);
Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
- for (byte[] r : keys) {
- RegionCoprocessorRpcChannel channel = coprocessorService(r);
- Future<R> future = pool.submit(new Callable<R>() {
- @Override
- public R call() throws Exception {
- R result = call.call(channel);
- byte[] region = channel.getLastRegion();
- if (callback != null) {
- callback.update(region, r, result);
+ try {
+ for (byte[] r : keys) {
+ RegionCoprocessorRpcChannel channel = coprocessorService(r);
+ Future<R> future = pool.submit(new Callable<R>() {
+ @Override
+ public R call() throws Exception {
+ R result = call.call(channel);
+ byte[] region = channel.getLastRegion();
+ if (callback != null) {
+ callback.update(region, r, result);
+ }
+ return result;
}
- return result;
- }
- });
- futures.put(r, future);
+ });
+ futures.put(r, future);
+ }
+ } catch (RejectedExecutionException e) {
+ // maybe the connection has been closed, let's check
+ if (pool.isShutdown()) {
+ throw new DoNotRetryIOException("Connection is closed", e);
+ } else {
+ throw new HBaseIOException("Coprocessor operation is rejected", e);
+ }
}
for (Map.Entry<byte[], Future<R>> e : futures.entrySet()) {
try {