You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/06/11 14:16:24 UTC

[hbase] 01/02: HBASE-22550 Throw exception when creating thread pool if the connection has already been closed

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 0fae01b7be19e95dc1a76d1f43367b0a7dea0e52
Author: zhangduo <zh...@apache.org>
AuthorDate: Fri Jun 7 20:47:43 2019 +0800

    HBASE-22550 Throw exception when creating thread pool if the connection has already been closed
---
 .../hadoop/hbase/client/AsyncConnectionImpl.java   | 10 ++---
 .../client/ConnectionOverAsyncConnection.java      | 49 ++++++++++++++++++----
 .../hadoop/hbase/client/ConnectionUtils.java       | 27 ------------
 .../hadoop/hbase/client/TableOverAsyncTable.java   | 11 +++--
 .../apache/hadoop/hbase/client/TestConnection.java | 31 +++++++++++++-
 5 files changed, 83 insertions(+), 45 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 84e1da6..78fad9e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -113,7 +114,7 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   private ChoreService authService;
 
-  private volatile boolean closed = false;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
 
   private final Optional<MetricsConnection> metrics;
 
@@ -188,14 +189,12 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   @Override
   public boolean isClosed() {
-    return closed;
+    return closed.get();
   }
 
   @Override
   public void close() {
-    // As the code below is safe to be executed in parallel, here we do not use CAS or lock, just a
-    // simple volatile flag.
-    if (closed) {
+    if (!closed.compareAndSet(false, true)) {
       return;
     }
     IOUtils.closeQuietly(clusterStatusListener);
@@ -209,7 +208,6 @@ class AsyncConnectionImpl implements AsyncConnection {
     if (c != null) {
       c.closePool();
     }
-    closed = true;
   }
 
   @Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
index 861aab0..b61cef5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
@@ -18,18 +18,25 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * The connection implementation based on {@link AsyncConnection}.
@@ -41,6 +48,10 @@ class ConnectionOverAsyncConnection implements Connection {
 
   private volatile boolean aborted = false;
 
+  // only used for executing coprocessor calls, as users may reference the methods in the
+  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
+  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
+  // interface.
   private volatile ExecutorService batchPool = null;
 
   private final AsyncConnectionImpl conn;
@@ -121,7 +132,7 @@ class ConnectionOverAsyncConnection implements Connection {
 
   // will be called from AsyncConnection, to avoid infinite loop as in the above method we will call
   // AsyncConnection.close.
-  void closePool() {
+  synchronized void closePool() {
     ExecutorService batchPool = this.batchPool;
     if (batchPool != null) {
       ConnectionUtils.shutdownPool(batchPool);
@@ -134,13 +145,36 @@ class ConnectionOverAsyncConnection implements Connection {
     return conn.isClosed();
   }
 
-  private ExecutorService getBatchPool() {
+  // only used for executing coprocessor calls, as users may reference the methods in the
+  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
+  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
+  // interface.
+  private ThreadPoolExecutor createThreadPool() {
+    Configuration conf = conn.getConfiguration();
+    int threads = conf.getInt("hbase.hconnection.threads.max", 256);
+    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
+    BlockingQueue<Runnable> workQueue =
+      new LinkedBlockingQueue<>(threads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
+        HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
+    ThreadPoolExecutor tpe = new ThreadPoolExecutor(threads, threads, keepAliveTime,
+      TimeUnit.SECONDS, workQueue,
+      new ThreadFactoryBuilder().setDaemon(true).setNameFormat(toString() + "-shared-%d").build());
+    tpe.allowCoreThreadTimeOut(true);
+    return tpe;
+  }
+
+  // only used for executing coprocessor calls, as users may reference the methods in the
+  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
+  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
+  // interface.
+  private ExecutorService getBatchPool() throws IOException {
     if (batchPool == null) {
       synchronized (this) {
+        if (isClosed()) {
+          throw new DoNotRetryIOException("Connection is closed");
+        }
         if (batchPool == null) {
-          int threads = conn.getConfiguration().getInt("hbase.hconnection.threads.max", 256);
-          this.batchPool = ConnectionUtils.getThreadPool(conn.getConfiguration(), threads, threads,
-            () -> toString() + "-shared", null);
+          this.batchPool = createThreadPool();
         }
       }
     }
@@ -153,13 +187,14 @@ class ConnectionOverAsyncConnection implements Connection {
 
       @Override
       public Table build() {
-        ExecutorService p = pool != null ? pool : getBatchPool();
+        IOExceptionSupplier<ExecutorService> poolSupplier =
+          pool != null ? () -> pool : ConnectionOverAsyncConnection.this::getBatchPool;
         return new TableOverAsyncTable(conn,
           conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS)
             .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS)
             .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS)
             .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS).build(),
-          p);
+          poolSupplier);
       }
     };
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 2ac3cd2..f1cf988 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -29,12 +29,9 @@ import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -54,7 +51,6 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -646,29 +642,6 @@ public final class ConnectionUtils {
     return future;
   }
 
-  static ThreadPoolExecutor getThreadPool(Configuration conf, int maxThreads, int coreThreads,
-      Supplier<String> threadName, BlockingQueue<Runnable> passedWorkQueue) {
-    // shared HTable thread executor not yet initialized
-    if (maxThreads == 0) {
-      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
-    }
-    if (coreThreads == 0) {
-      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
-    }
-    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
-    BlockingQueue<Runnable> workQueue = passedWorkQueue;
-    if (workQueue == null) {
-      workQueue =
-        new LinkedBlockingQueue<>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
-          HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
-      coreThreads = maxThreads;
-    }
-    ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime,
-      TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(threadName.get()));
-    tpe.allowCoreThreadTimeOut(true);
-    return tpe;
-  }
-
   static void shutdownPool(ExecutorService pool) {
     pool.shutdown();
     try {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
index 5686b09..0a2a66e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
@@ -76,12 +77,13 @@ class TableOverAsyncTable implements Table {
 
   private final AsyncTable<?> table;
 
-  private final ExecutorService pool;
+  private final IOExceptionSupplier<ExecutorService> poolSupplier;
 
-  TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable<?> table, ExecutorService pool) {
+  TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable<?> table,
+      IOExceptionSupplier<ExecutorService> poolSupplier) {
     this.conn = conn;
     this.table = table;
-    this.pool = pool;
+    this.poolSupplier = poolSupplier;
   }
 
   @Override
@@ -423,6 +425,7 @@ class TableOverAsyncTable implements Table {
   private <R> void coprocssorService(String serviceName, byte[] startKey, byte[] endKey,
       Callback<R> callback, StubCall<R> call) throws Throwable {
     // get regions covered by the row range
+    ExecutorService pool = this.poolSupplier.get();
     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
     Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
     try {
@@ -443,7 +446,7 @@ class TableOverAsyncTable implements Table {
       }
     } catch (RejectedExecutionException e) {
       // maybe the connection has been closed, let's check
-      if (pool.isShutdown()) {
+      if (conn.isClosed()) {
         throw new DoNotRetryIOException("Connection is closed", e);
       } else {
         throw new HBaseIOException("Coprocessor operation is rejected", e);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java
index 8dd4709..df715c2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import com.google.protobuf.ServiceException;
 import java.io.IOException;
 import java.util.List;
 import java.util.Set;
@@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -37,7 +39,11 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
 import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -336,7 +342,7 @@ public class TestConnection {
     TEST_UTIL.getAdmin().createTable(builder.build());
 
     try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
-        RegionLocator locator = conn.getRegionLocator(tableName)) {
+      RegionLocator locator = conn.getRegionLocator(tableName)) {
       // Get locations of the regions of the table
       List<HRegionLocation> locations = locator.getAllRegionLocations();
 
@@ -353,4 +359,27 @@ public class TestConnection {
       TEST_UTIL.deleteTable(tableName);
     }
   }
+
+  @Test(expected = DoNotRetryIOException.class)
+  public void testClosedConnection() throws ServiceException, Throwable {
+    byte[] family = Bytes.toBytes("cf");
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
+      .setCoprocessor(MultiRowMutationEndpoint.class.getName())
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
+    TEST_UTIL.getAdmin().createTable(builder.build());
+
+    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+    // cache the location
+    try (Table table = conn.getTable(tableName)) {
+      table.get(new Get(Bytes.toBytes(0)));
+    } finally {
+      conn.close();
+    }
+    Batch.Call<MultiRowMutationService, MutateRowsResponse> callable = service -> {
+      throw new RuntimeException("Should not arrive here");
+    };
+    conn.getTable(tableName).coprocessorService(MultiRowMutationService.class,
+      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, callable);
+  }
 }