You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/06/11 14:16:23 UTC

[hbase] branch HBASE-21512 updated (50cf241 -> 6ab153c)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a change to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git.


 discard 50cf241  HBASE-22553 NPE in RegionReplicaReplicationEndpoint
 discard 5b1093e  HBASE-22550 Do not use Threads.newDaemonThreadFactory in ConnectionUtils.getThreadPool
     new 0fae01b  HBASE-22550 Throw exception when creating thread pool if the connection has already been closed
     new 6ab153c  HBASE-22553 NPE in RegionReplicaReplicationEndpoint

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
branch are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (50cf241)
            \
             N -- N -- N   refs/heads/HBASE-21512 (6ab153c)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../hadoop/hbase/client/AsyncConnectionImpl.java   | 10 +++----
 .../client/ConnectionOverAsyncConnection.java      | 14 +++++++---
 .../hadoop/hbase/client/TableOverAsyncTable.java   | 11 +++++---
 .../apache/hadoop/hbase/client/TestConnection.java | 31 +++++++++++++++++++++-
 4 files changed, 51 insertions(+), 15 deletions(-)


[hbase] 02/02: HBASE-22553 NPE in RegionReplicaReplicationEndpoint

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 6ab153c32f89dd2abdb7944e2b1eab67a0986054
Author: zhangduo <zh...@apache.org>
AuthorDate: Sat Jun 8 20:40:23 2019 +0800

    HBASE-22553 NPE in RegionReplicaReplicationEndpoint
---
 .../RegionReplicaReplicationEndpoint.java          | 51 +++++++++++++---------
 1 file changed, 30 insertions(+), 21 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index cc2650f..2c3b19b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -32,12 +32,15 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HBaseIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RegionLocations;
 import org.apache.hadoop.hbase.TableDescriptors;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.WALEntryFilter;
@@ -162,9 +165,9 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
           return;
         }
         // check if the number of region replicas is correct, and also the primary region name
-        // matches, and also there is no null elements in the returned RegionLocations
+        // matches.
         if (locs.size() == tableDesc.getRegionReplication() &&
-          locs.size() == locs.numNonNullElements() &&
+          locs.getDefaultRegionLocation() != null &&
           Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
             encodedRegionName)) {
           future.complete(locs);
@@ -182,8 +185,8 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
       future.complete(Long.valueOf(entries.size()));
       return;
     }
-    if (!Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
-      encodedRegionName)) {
+    RegionInfo defaultReplica = locs.getDefaultRegionLocation().getRegion();
+    if (!Bytes.equals(defaultReplica.getEncodedNameAsBytes(), encodedRegionName)) {
       // the region name is not equal, this usually means the region has been split or merged, so
       // give up replicating as the new region(s) should already have all the data of the parent
       // region(s).
@@ -191,7 +194,7 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
         LOG.trace(
           "Skipping {} entries in table {} because located region {} is different than" +
             " the original region {} from WALEdit",
-          tableDesc.getTableName(), locs.getDefaultRegionLocation().getRegion().getEncodedName(),
+          tableDesc.getTableName(), defaultReplica.getEncodedName(),
           Bytes.toStringBinary(encodedRegionName));
       }
       future.complete(Long.valueOf(entries.size()));
@@ -202,24 +205,26 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
     AtomicLong skippedEdits = new AtomicLong(0);
 
     for (int i = 1, n = locs.size(); i < n; i++) {
-      final int replicaId = i;
-      FutureUtils.addListener(connection.replay(tableDesc.getTableName(),
-        locs.getRegionLocation(replicaId).getRegion().getEncodedNameAsBytes(), row, entries,
-        replicaId, numRetries, operationTimeoutNs), (r, e) -> {
-          if (e != null) {
-            LOG.warn("Failed to replicate to {}", locs.getRegionLocation(replicaId), e);
-            error.compareAndSet(null, e);
-          } else {
-            AtomicUtils.updateMax(skippedEdits, r.longValue());
-          }
-          if (remainingTasks.decrementAndGet() == 0) {
-            if (error.get() != null) {
-              future.completeExceptionally(error.get());
+      // Do not use the elements other than the default replica as they may be null. We will fail
+      // earlier if the location for default replica is null.
+      final RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(defaultReplica, i);
+      FutureUtils
+        .addListener(connection.replay(tableDesc.getTableName(), replica.getEncodedNameAsBytes(),
+          row, entries, replica.getReplicaId(), numRetries, operationTimeoutNs), (r, e) -> {
+            if (e != null) {
+              LOG.warn("Failed to replicate to {}", replica, e);
+              error.compareAndSet(null, e);
             } else {
-              future.complete(skippedEdits.get());
+              AtomicUtils.updateMax(skippedEdits, r.longValue());
             }
-          }
-        });
+            if (remainingTasks.decrementAndGet() == 0) {
+              if (error.get() != null) {
+                future.completeExceptionally(error.get());
+              } else {
+                future.complete(skippedEdits.get());
+              }
+            }
+          });
     }
   }
 
@@ -245,6 +250,10 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
     FutureUtils.addListener(locateFuture, (locs, error) -> {
       if (error != null) {
         future.completeExceptionally(error);
+      } else if (locs.getDefaultRegionLocation() == null) {
+        future.completeExceptionally(
+          new HBaseIOException("No location found for default replica of table=" +
+            tableDesc.getTableName() + " row='" + Bytes.toStringBinary(row) + "'"));
       } else {
         replicate(future, locs, tableDesc, encodedRegionName, row, entries);
       }


[hbase] 01/02: HBASE-22550 Throw exception when creating thread pool if the connection has already been closed

Posted by zh...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 0fae01b7be19e95dc1a76d1f43367b0a7dea0e52
Author: zhangduo <zh...@apache.org>
AuthorDate: Fri Jun 7 20:47:43 2019 +0800

    HBASE-22550 Throw exception when creating thread pool if the connection has already been closed
---
 .../hadoop/hbase/client/AsyncConnectionImpl.java   | 10 ++---
 .../client/ConnectionOverAsyncConnection.java      | 49 ++++++++++++++++++----
 .../hadoop/hbase/client/ConnectionUtils.java       | 27 ------------
 .../hadoop/hbase/client/TableOverAsyncTable.java   | 11 +++--
 .../apache/hadoop/hbase/client/TestConnection.java | 31 +++++++++++++-
 5 files changed, 83 insertions(+), 45 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index 84e1da6..78fad9e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -113,7 +114,7 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   private ChoreService authService;
 
-  private volatile boolean closed = false;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
 
   private final Optional<MetricsConnection> metrics;
 
@@ -188,14 +189,12 @@ class AsyncConnectionImpl implements AsyncConnection {
 
   @Override
   public boolean isClosed() {
-    return closed;
+    return closed.get();
   }
 
   @Override
   public void close() {
-    // As the code below is safe to be executed in parallel, here we do not use CAS or lock, just a
-    // simple volatile flag.
-    if (closed) {
+    if (!closed.compareAndSet(false, true)) {
       return;
     }
     IOUtils.closeQuietly(clusterStatusListener);
@@ -209,7 +208,6 @@ class AsyncConnectionImpl implements AsyncConnection {
     if (c != null) {
       c.closePool();
     }
-    closed = true;
   }
 
   @Override
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
index 861aab0..b61cef5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionOverAsyncConnection.java
@@ -18,18 +18,25 @@
 package org.apache.hadoop.hbase.client;
 
 import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.log.HBaseMarkers;
+import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
  * The connection implementation based on {@link AsyncConnection}.
@@ -41,6 +48,10 @@ class ConnectionOverAsyncConnection implements Connection {
 
   private volatile boolean aborted = false;
 
+  // only used for executing coprocessor calls, as users may reference the methods in the
+  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
+  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
+  // interface.
   private volatile ExecutorService batchPool = null;
 
   private final AsyncConnectionImpl conn;
@@ -121,7 +132,7 @@ class ConnectionOverAsyncConnection implements Connection {
 
   // will be called from AsyncConnection, to avoid infinite loop as in the above method we will call
   // AsyncConnection.close.
-  void closePool() {
+  synchronized void closePool() {
     ExecutorService batchPool = this.batchPool;
     if (batchPool != null) {
       ConnectionUtils.shutdownPool(batchPool);
@@ -134,13 +145,36 @@ class ConnectionOverAsyncConnection implements Connection {
     return conn.isClosed();
   }
 
-  private ExecutorService getBatchPool() {
+  // only used for executing coprocessor calls, as users may reference the methods in the
+  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
+  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
+  // interface.
+  private ThreadPoolExecutor createThreadPool() {
+    Configuration conf = conn.getConfiguration();
+    int threads = conf.getInt("hbase.hconnection.threads.max", 256);
+    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
+    BlockingQueue<Runnable> workQueue =
+      new LinkedBlockingQueue<>(threads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
+        HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
+    ThreadPoolExecutor tpe = new ThreadPoolExecutor(threads, threads, keepAliveTime,
+      TimeUnit.SECONDS, workQueue,
+      new ThreadFactoryBuilder().setDaemon(true).setNameFormat(toString() + "-shared-%d").build());
+    tpe.allowCoreThreadTimeOut(true);
+    return tpe;
+  }
+
+  // only used for executing coprocessor calls, as users may reference the methods in the
+  // BlockingInterface of the protobuf stub so we have to execute the call in a separated thread...
+  // Will be removed in 4.0.0 along with the deprecated coprocessor methods in Table and Admin
+  // interface.
+  private ExecutorService getBatchPool() throws IOException {
     if (batchPool == null) {
       synchronized (this) {
+        if (isClosed()) {
+          throw new DoNotRetryIOException("Connection is closed");
+        }
         if (batchPool == null) {
-          int threads = conn.getConfiguration().getInt("hbase.hconnection.threads.max", 256);
-          this.batchPool = ConnectionUtils.getThreadPool(conn.getConfiguration(), threads, threads,
-            () -> toString() + "-shared", null);
+          this.batchPool = createThreadPool();
         }
       }
     }
@@ -153,13 +187,14 @@ class ConnectionOverAsyncConnection implements Connection {
 
       @Override
       public Table build() {
-        ExecutorService p = pool != null ? pool : getBatchPool();
+        IOExceptionSupplier<ExecutorService> poolSupplier =
+          pool != null ? () -> pool : ConnectionOverAsyncConnection.this::getBatchPool;
         return new TableOverAsyncTable(conn,
           conn.getTableBuilder(tableName).setRpcTimeout(rpcTimeout, TimeUnit.MILLISECONDS)
             .setReadRpcTimeout(readRpcTimeout, TimeUnit.MILLISECONDS)
             .setWriteRpcTimeout(writeRpcTimeout, TimeUnit.MILLISECONDS)
             .setOperationTimeout(operationTimeout, TimeUnit.MILLISECONDS).build(),
-          p);
+          poolSupplier);
       }
     };
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
index 2ac3cd2..f1cf988 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionUtils.java
@@ -29,12 +29,9 @@ import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
@@ -54,7 +51,6 @@ import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.ipc.ServerRpcController;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
-import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.DNS;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -646,29 +642,6 @@ public final class ConnectionUtils {
     return future;
   }
 
-  static ThreadPoolExecutor getThreadPool(Configuration conf, int maxThreads, int coreThreads,
-      Supplier<String> threadName, BlockingQueue<Runnable> passedWorkQueue) {
-    // shared HTable thread executor not yet initialized
-    if (maxThreads == 0) {
-      maxThreads = Runtime.getRuntime().availableProcessors() * 8;
-    }
-    if (coreThreads == 0) {
-      coreThreads = Runtime.getRuntime().availableProcessors() * 8;
-    }
-    long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
-    BlockingQueue<Runnable> workQueue = passedWorkQueue;
-    if (workQueue == null) {
-      workQueue =
-        new LinkedBlockingQueue<>(maxThreads * conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
-          HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
-      coreThreads = maxThreads;
-    }
-    ThreadPoolExecutor tpe = new ThreadPoolExecutor(coreThreads, maxThreads, keepAliveTime,
-      TimeUnit.SECONDS, workQueue, Threads.newDaemonThreadFactory(threadName.get()));
-    tpe.allowCoreThreadTimeOut(true);
-    return tpe;
-  }
-
   static void shutdownPool(ExecutorService pool) {
     pool.shutdown();
     try {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
index 5686b09..0a2a66e 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableOverAsyncTable.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hbase.client.coprocessor.Batch.Callback;
 import org.apache.hadoop.hbase.io.TimeRange;
 import org.apache.hadoop.hbase.ipc.CoprocessorRpcChannel;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConcurrentMapUtils.IOExceptionSupplier;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FutureUtils;
 import org.apache.hadoop.hbase.util.Pair;
@@ -76,12 +77,13 @@ class TableOverAsyncTable implements Table {
 
   private final AsyncTable<?> table;
 
-  private final ExecutorService pool;
+  private final IOExceptionSupplier<ExecutorService> poolSupplier;
 
-  TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable<?> table, ExecutorService pool) {
+  TableOverAsyncTable(AsyncConnectionImpl conn, AsyncTable<?> table,
+      IOExceptionSupplier<ExecutorService> poolSupplier) {
     this.conn = conn;
     this.table = table;
-    this.pool = pool;
+    this.poolSupplier = poolSupplier;
   }
 
   @Override
@@ -423,6 +425,7 @@ class TableOverAsyncTable implements Table {
   private <R> void coprocssorService(String serviceName, byte[] startKey, byte[] endKey,
       Callback<R> callback, StubCall<R> call) throws Throwable {
     // get regions covered by the row range
+    ExecutorService pool = this.poolSupplier.get();
     List<byte[]> keys = getStartKeysInRange(startKey, endKey);
     Map<byte[], Future<R>> futures = new TreeMap<>(Bytes.BYTES_COMPARATOR);
     try {
@@ -443,7 +446,7 @@ class TableOverAsyncTable implements Table {
       }
     } catch (RejectedExecutionException e) {
       // maybe the connection has been closed, let's check
-      if (pool.isShutdown()) {
+      if (conn.isClosed()) {
         throw new DoNotRetryIOException("Connection is closed", e);
       } else {
         throw new HBaseIOException("Coprocessor operation is rejected", e);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java
index 8dd4709..df715c2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestConnection.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
+import com.google.protobuf.ServiceException;
 import java.io.IOException;
 import java.util.List;
 import java.util.Set;
@@ -30,6 +31,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -37,7 +39,11 @@ import org.apache.hadoop.hbase.HRegionLocation;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.coprocessor.Batch;
+import org.apache.hadoop.hbase.coprocessor.MultiRowMutationEndpoint;
 import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MultiRowMutationService;
+import org.apache.hadoop.hbase.protobuf.generated.MultiRowMutationProtos.MutateRowsResponse;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -336,7 +342,7 @@ public class TestConnection {
     TEST_UTIL.getAdmin().createTable(builder.build());
 
     try (Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
-        RegionLocator locator = conn.getRegionLocator(tableName)) {
+      RegionLocator locator = conn.getRegionLocator(tableName)) {
       // Get locations of the regions of the table
       List<HRegionLocation> locations = locator.getAllRegionLocations();
 
@@ -353,4 +359,27 @@ public class TestConnection {
       TEST_UTIL.deleteTable(tableName);
     }
   }
+
+  @Test(expected = DoNotRetryIOException.class)
+  public void testClosedConnection() throws ServiceException, Throwable {
+    byte[] family = Bytes.toBytes("cf");
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    TableDescriptorBuilder builder = TableDescriptorBuilder.newBuilder(tableName)
+      .setCoprocessor(MultiRowMutationEndpoint.class.getName())
+      .setColumnFamily(ColumnFamilyDescriptorBuilder.of(family));
+    TEST_UTIL.getAdmin().createTable(builder.build());
+
+    Connection conn = ConnectionFactory.createConnection(TEST_UTIL.getConfiguration());
+    // cache the location
+    try (Table table = conn.getTable(tableName)) {
+      table.get(new Get(Bytes.toBytes(0)));
+    } finally {
+      conn.close();
+    }
+    Batch.Call<MultiRowMutationService, MutateRowsResponse> callable = service -> {
+      throw new RuntimeException("Should not arrive here");
+    };
+    conn.getTable(tableName).coprocessorService(MultiRowMutationService.class,
+      HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW, callable);
+  }
 }