You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2020/08/02 05:23:04 UTC

[hbase] branch master updated: HBASE-24805 HBaseTestingUtility.getConnection should be threadsafe

This is an automated email from the ASF dual-hosted git repository.

busbey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 86ebbdd  HBASE-24805 HBaseTestingUtility.getConnection should be threadsafe
86ebbdd is described below

commit 86ebbdd8a2df89de37c2c3bd50e64292eaf28b11
Author: Sean Busbey <bu...@apache.org>
AuthorDate: Fri Jul 31 02:34:24 2020 -0500

    HBASE-24805 HBaseTestingUtility.getConnection should be threadsafe
    
    * refactor how we use connection and async connection to rely on their access methods
    * refactor initialization and cleanup of the shared connection
    * incompatibly change HCTU's Configuration member variable to be final so it can be safely accessed from multiple threads.
    
    Closes #2180
    
    Signed-off-by: Wellington Chevreuil <wc...@apache.org>
    Signed-off-by: Viraj Jasani <vj...@apache.org>
---
 .../hadoop/hbase/HBaseCommonTestingUtility.java    |  2 +-
 .../apache/hadoop/hbase/HBaseTestingUtility.java   | 66 ++++++++++++----------
 2 files changed, 37 insertions(+), 31 deletions(-)

diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java
index eb04cca..487c926 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java
@@ -69,7 +69,7 @@ public class HBaseCommonTestingUtility {
     Compression.Algorithm.NONE, Compression.Algorithm.GZ
   };
 
-  protected Configuration conf;
+  protected final Configuration conf;
 
   public HBaseCommonTestingUtility() {
     this(null);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 0b5f816..0c4f7fb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -25,6 +25,7 @@ import edu.umd.cs.findbugs.annotations.Nullable;
 import java.io.File;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.io.UncheckedIOException;
 import java.lang.reflect.Field;
 import java.lang.reflect.Modifier;
 import java.net.BindException;
@@ -207,7 +208,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
     * HBaseTestingUtility*/
   private Path dataTestDirOnTestFS = null;
 
-  private volatile AsyncClusterConnection asyncConnection;
+  private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
 
   /** Filesystem URI used for map-reduce mini-cluster setup */
   private static String FS_URI;
@@ -1237,14 +1238,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
 
   public void restartHBaseCluster(StartMiniClusterOption option)
       throws IOException, InterruptedException {
-    if (hbaseAdmin != null) {
-      hbaseAdmin.close();
-      hbaseAdmin = null;
-    }
-    if (this.asyncConnection != null) {
-      this.asyncConnection.close();
-      this.asyncConnection = null;
-    }
+    closeConnection();
     this.hbaseCluster =
         new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
             option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
@@ -3041,11 +3035,6 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
     return hbaseCluster;
   }
 
-  private void initConnection() throws IOException {
-    User user = UserProvider.instantiate(conf).getCurrent();
-    this.asyncConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
-  }
-
   /**
    * Resets the connections so that the next time getConnection() is called, a new connection is
    * created. This is needed in cases where the entire cluster / all the masters are shutdown and
@@ -3067,29 +3056,46 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
   }
 
   /**
-   * Get a Connection to the cluster. Not thread-safe (This class needs a lot of work to make it
-   * thread-safe).
+   * Get a shared Connection to the cluster.
+   * this method is threadsafe.
    * @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
    */
   public Connection getConnection() throws IOException {
-    if (this.asyncConnection == null) {
-      initConnection();
-    }
-    return this.asyncConnection.toConnection();
+    return getAsyncConnection().toConnection();
   }
 
+  /**
+   * Get a shared AsyncClusterConnection to the cluster.
+   * this method is threadsafe.
+   * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown of cluster.
+   */
   public AsyncClusterConnection getAsyncConnection() throws IOException {
-    if (this.asyncConnection == null) {
-      initConnection();
+    try {
+      return asyncConnection.updateAndGet(connection -> {
+        if (connection == null) {
+          try {
+            User user = UserProvider.instantiate(conf).getCurrent();
+            connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
+          } catch(IOException ioe) {
+            throw new UncheckedIOException("Failed to create connection", ioe);
+          }
+        }
+        return connection;
+      });
+    } catch (UncheckedIOException exception) {
+      throw exception.getCause();
     }
-    return this.asyncConnection;
   }
 
   public void closeConnection() throws IOException {
-    Closeables.close(hbaseAdmin, true);
-    Closeables.close(asyncConnection, true);
-    this.hbaseAdmin = null;
-    this.asyncConnection = null;
+    if (hbaseAdmin != null) {
+      Closeables.close(hbaseAdmin, true);
+      hbaseAdmin = null;
+    }
+    AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
+    if (asyncConnection != null) {
+      Closeables.close(asyncConnection, true);
+    }
   }
 
   /**
@@ -3252,7 +3258,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
       Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
         .getRegionStates().getRegionAssignments();
       final List<Pair<RegionInfo, ServerName>> metaLocations =
-        MetaTableAccessor.getTableRegionsAndLocations(asyncConnection.toConnection(), tableName);
+        MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
       for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
         RegionInfo hri = metaLocation.getFirst();
         ServerName sn = metaLocation.getSecond();
@@ -3272,7 +3278,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
 
   public String explainTableState(final TableName table, TableState.State state)
       throws IOException {
-    TableState tableState = MetaTableAccessor.getTableState(asyncConnection.toConnection(), table);
+    TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
     if (tableState == null) {
       return "TableState in META: No table state in META for table " + table +
         " last state in meta (including deleted is " + findLastTableState(table) + ")";
@@ -3299,7 +3305,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
         return true;
       }
     };
-    MetaTableAccessor.scanMeta(asyncConnection.toConnection(), null, null,
+    MetaTableAccessor.scanMeta(getConnection(), null, null,
       ClientMetaTableAccessor.QueryType.TABLE, Integer.MAX_VALUE, visitor);
     return lastTableState.get();
   }