You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/10/01 22:17:56 UTC
[2/2] hbase git commit: HBASE-14485 ConnectionImplementation leaks on
construction failure
HBASE-14485 ConnectionImplementation leaks on construction failure
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2e8575bb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2e8575bb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2e8575bb
Branch: refs/heads/master
Commit: 2e8575bb0ff84c2c3dbefd166213124abe86cdd1
Parents: 4da3c93
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Oct 1 12:56:37 2015 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Oct 1 12:56:37 2015 -0700
----------------------------------------------------------------------
.../hbase/client/ConnectionImplementation.java | 87 ++++++++++----------
.../hadoop/hbase/client/RegistryFactory.java | 3 +-
.../hadoop/hbase/client/TestAsyncProcess.java | 31 ++++++-
3 files changed, 73 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/2e8575bb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index ade32a8..4e64831 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -182,44 +182,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
*/
ConnectionImplementation(Configuration conf,
ExecutorService pool, User user) throws IOException {
- this(conf);
+ this.conf = conf;
this.user = user;
this.batchPool = pool;
- this.registry = setupRegistry();
- retrieveClusterId();
-
- this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
- this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-
- // Do we publish the status?
- boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
- HConstants.STATUS_PUBLISHED_DEFAULT);
- Class<? extends ClusterStatusListener.Listener> listenerClass =
- conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
- ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
- ClusterStatusListener.Listener.class);
- if (shouldListen) {
- if (listenerClass == null) {
- LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
- ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
- } else {
- clusterStatusListener = new ClusterStatusListener(
- new ClusterStatusListener.DeadServerHandler() {
- @Override
- public void newDead(ServerName sn) {
- clearCaches(sn);
- rpcClient.cancelConnections(sn);
- }
- }, conf, listenerClass);
- }
- }
- }
-
- /**
- * For tests.
- */
- protected ConnectionImplementation(Configuration conf) {
- this.conf = conf;
this.tableConfig = new TableConfiguration(conf);
this.closed = false;
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
@@ -239,11 +204,49 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
} else {
nonceGenerator = new NoNonceGenerator();
}
- stats = ServerStatisticTracker.create(conf);
- this.asyncProcess = createAsyncProcess(this.conf);
+
+ this.stats = ServerStatisticTracker.create(conf);
this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
+ this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
+ this.asyncProcess = createAsyncProcess(this.conf);
+
+ boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
+ HConstants.STATUS_PUBLISHED_DEFAULT);
+ Class<? extends ClusterStatusListener.Listener> listenerClass =
+ conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
+ ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
+ ClusterStatusListener.Listener.class);
+
+ try {
+ this.registry = setupRegistry();
+ retrieveClusterId();
+
+ this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
+
+ // Do we publish the status?
+ if (shouldListen) {
+ if (listenerClass == null) {
+ LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
+ ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
+ } else {
+ clusterStatusListener = new ClusterStatusListener(
+ new ClusterStatusListener.DeadServerHandler() {
+ @Override
+ public void newDead(ServerName sn) {
+ clearCaches(sn);
+ rpcClient.cancelConnections(sn);
+ }
+ }, conf, listenerClass);
+ }
+ }
+ } catch (Throwable e) {
+ // avoid leaks: registry, rpcClient, ...
+ LOG.debug("connection construction failed", e);
+ close();
+ throw e;
+ }
}
/**
@@ -370,7 +373,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
synchronized (this) {
if (batchPool == null) {
this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
- conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
+ conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
this.cleanupPool = true;
}
}
@@ -478,7 +481,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
protected String clusterId = null;
- void retrieveClusterId() {
+ protected void retrieveClusterId() {
if (clusterId != null) return;
this.clusterId = this.registry.getClusterId();
if (clusterId == null) {
@@ -1979,9 +1982,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
// For tests to override.
protected AsyncProcess createAsyncProcess(Configuration conf) {
// No default pool available.
- return new AsyncProcess(this, conf, this.batchPool,
- RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
- RpcControllerFactory.instantiate(conf));
+ return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/2e8575bb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java
index 9adcb6f..8ce1f66 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
*/
@InterfaceAudience.Private
final class RegistryFactory {
+ static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";
private RegistryFactory() {}
@@ -35,7 +36,7 @@ final class RegistryFactory {
*/
static Registry getRegistry(final Connection connection)
throws IOException {
- String registryClass = connection.getConfiguration().get("hbase.client.registry.impl",
+ String registryClass = connection.getConfiguration().get(REGISTRY_IMPL_CONF_KEY,
ZooKeeperRegistry.class.getName());
Registry registry = null;
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/2e8575bb/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index fa3ed32..7e7139a 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -342,11 +342,35 @@ public class TestAsyncProcess {
* Returns our async process.
*/
static class MyConnectionImpl extends ConnectionImplementation {
+ public static class TestRegistry implements Registry {
+ @Override
+ public void init(Connection connection) {}
+
+ @Override
+ public RegionLocations getMetaRegionLocation() throws IOException {
+ return null;
+ }
+
+ @Override
+ public String getClusterId() {
+ return "testClusterId";
+ }
+
+ @Override
+ public int getCurrentNrHRS() throws IOException {
+ return 1;
+ }
+ }
+
final AtomicInteger nbThreads = new AtomicInteger(0);
+ protected MyConnectionImpl(Configuration conf) throws IOException {
+ super(setupConf(conf), null, null);
+ }
- protected MyConnectionImpl(Configuration conf) {
- super(conf);
+ private static Configuration setupConf(Configuration conf) {
+ conf.setClass(RegistryFactory.REGISTRY_IMPL_CONF_KEY, TestRegistry.class, Registry.class);
+ return conf;
}
@Override
@@ -363,7 +387,7 @@ public class TestAsyncProcess {
List<HRegionLocation> hrl;
final boolean usedRegions[];
- protected MyConnectionImpl2(List<HRegionLocation> hrl) {
+ protected MyConnectionImpl2(List<HRegionLocation> hrl) throws IOException {
super(conf);
this.hrl = hrl;
this.usedRegions = new boolean[hrl.size()];
@@ -382,7 +406,6 @@ public class TestAsyncProcess {
}
return null;
}
-
}
@Rule