You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bu...@apache.org on 2020/08/02 05:23:04 UTC
[hbase] branch master updated: HBASE-24805
HBaseTestingUtility.getConnection should be threadsafe
This is an automated email from the ASF dual-hosted git repository.
busbey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 86ebbdd HBASE-24805 HBaseTestingUtility.getConnection should be threadsafe
86ebbdd is described below
commit 86ebbdd8a2df89de37c2c3bd50e64292eaf28b11
Author: Sean Busbey <bu...@apache.org>
AuthorDate: Fri Jul 31 02:34:24 2020 -0500
HBASE-24805 HBaseTestingUtility.getConnection should be threadsafe
* refactor how we use connection and async connection to rely on their access methods
* refactor initialization and cleanup of the shared connection
* incompatibly change HCTU's Configuration member variable to be final so it can be safely accessed from multiple threads.
Closes #2180
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
Signed-off-by: Viraj Jasani <vj...@apache.org>
---
.../hadoop/hbase/HBaseCommonTestingUtility.java | 2 +-
.../apache/hadoop/hbase/HBaseTestingUtility.java | 66 ++++++++++++----------
2 files changed, 37 insertions(+), 31 deletions(-)
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java
index eb04cca..487c926 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/HBaseCommonTestingUtility.java
@@ -69,7 +69,7 @@ public class HBaseCommonTestingUtility {
Compression.Algorithm.NONE, Compression.Algorithm.GZ
};
- protected Configuration conf;
+ protected final Configuration conf;
public HBaseCommonTestingUtility() {
this(null);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
index 0b5f816..0c4f7fb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
@@ -25,6 +25,7 @@ import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
+import java.io.UncheckedIOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.BindException;
@@ -207,7 +208,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
* HBaseTestingUtility*/
private Path dataTestDirOnTestFS = null;
- private volatile AsyncClusterConnection asyncConnection;
+ private final AtomicReference<AsyncClusterConnection> asyncConnection = new AtomicReference<>();
/** Filesystem URI used for map-reduce mini-cluster setup */
private static String FS_URI;
@@ -1237,14 +1238,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
public void restartHBaseCluster(StartMiniClusterOption option)
throws IOException, InterruptedException {
- if (hbaseAdmin != null) {
- hbaseAdmin.close();
- hbaseAdmin = null;
- }
- if (this.asyncConnection != null) {
- this.asyncConnection.close();
- this.asyncConnection = null;
- }
+ closeConnection();
this.hbaseCluster =
new MiniHBaseCluster(this.conf, option.getNumMasters(), option.getNumAlwaysStandByMasters(),
option.getNumRegionServers(), option.getRsPorts(), option.getMasterClass(),
@@ -3041,11 +3035,6 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
return hbaseCluster;
}
- private void initConnection() throws IOException {
- User user = UserProvider.instantiate(conf).getCurrent();
- this.asyncConnection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
- }
-
/**
* Resets the connections so that the next time getConnection() is called, a new connection is
* created. This is needed in cases where the entire cluster / all the masters are shutdown and
@@ -3067,29 +3056,46 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
}
/**
- * Get a Connection to the cluster. Not thread-safe (This class needs a lot of work to make it
- * thread-safe).
+ * Get a shared Connection to the cluster.
+ * this method is threadsafe.
* @return A Connection that can be shared. Don't close. Will be closed on shutdown of cluster.
*/
public Connection getConnection() throws IOException {
- if (this.asyncConnection == null) {
- initConnection();
- }
- return this.asyncConnection.toConnection();
+ return getAsyncConnection().toConnection();
}
+ /**
+ * Get a shared AsyncClusterConnection to the cluster.
+ * this method is threadsafe.
+ * @return An AsyncClusterConnection that can be shared. Don't close. Will be closed on shutdown of cluster.
+ */
public AsyncClusterConnection getAsyncConnection() throws IOException {
- if (this.asyncConnection == null) {
- initConnection();
+ try {
+ return asyncConnection.updateAndGet(connection -> {
+ if (connection == null) {
+ try {
+ User user = UserProvider.instantiate(conf).getCurrent();
+ connection = ClusterConnectionFactory.createAsyncClusterConnection(conf, null, user);
+ } catch(IOException ioe) {
+ throw new UncheckedIOException("Failed to create connection", ioe);
+ }
+ }
+ return connection;
+ });
+ } catch (UncheckedIOException exception) {
+ throw exception.getCause();
}
- return this.asyncConnection;
}
public void closeConnection() throws IOException {
- Closeables.close(hbaseAdmin, true);
- Closeables.close(asyncConnection, true);
- this.hbaseAdmin = null;
- this.asyncConnection = null;
+ if (hbaseAdmin != null) {
+ Closeables.close(hbaseAdmin, true);
+ hbaseAdmin = null;
+ }
+ AsyncClusterConnection asyncConnection = this.asyncConnection.getAndSet(null);
+ if (asyncConnection != null) {
+ Closeables.close(asyncConnection, true);
+ }
}
/**
@@ -3252,7 +3258,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
Map<RegionInfo, ServerName> assignments = getHBaseCluster().getMaster().getAssignmentManager()
.getRegionStates().getRegionAssignments();
final List<Pair<RegionInfo, ServerName>> metaLocations =
- MetaTableAccessor.getTableRegionsAndLocations(asyncConnection.toConnection(), tableName);
+ MetaTableAccessor.getTableRegionsAndLocations(getConnection(), tableName);
for (Pair<RegionInfo, ServerName> metaLocation : metaLocations) {
RegionInfo hri = metaLocation.getFirst();
ServerName sn = metaLocation.getSecond();
@@ -3272,7 +3278,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
public String explainTableState(final TableName table, TableState.State state)
throws IOException {
- TableState tableState = MetaTableAccessor.getTableState(asyncConnection.toConnection(), table);
+ TableState tableState = MetaTableAccessor.getTableState(getConnection(), table);
if (tableState == null) {
return "TableState in META: No table state in META for table " + table +
" last state in meta (including deleted is " + findLastTableState(table) + ")";
@@ -3299,7 +3305,7 @@ public class HBaseTestingUtility extends HBaseZKTestingUtility {
return true;
}
};
- MetaTableAccessor.scanMeta(asyncConnection.toConnection(), null, null,
+ MetaTableAccessor.scanMeta(getConnection(), null, null,
ClientMetaTableAccessor.QueryType.TABLE, Integer.MAX_VALUE, visitor);
return lastTableState.get();
}