You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by mb...@apache.org on 2015/10/01 22:17:56 UTC

[2/2] hbase git commit: HBASE-14485 ConnectionImplementation leaks on construction failure

HBASE-14485 ConnectionImplementation leaks on construction failure


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/2e8575bb
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/2e8575bb
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/2e8575bb

Branch: refs/heads/master
Commit: 2e8575bb0ff84c2c3dbefd166213124abe86cdd1
Parents: 4da3c93
Author: Matteo Bertozzi <ma...@cloudera.com>
Authored: Thu Oct 1 12:56:37 2015 -0700
Committer: Matteo Bertozzi <ma...@cloudera.com>
Committed: Thu Oct 1 12:56:37 2015 -0700

----------------------------------------------------------------------
 .../hbase/client/ConnectionImplementation.java  | 87 ++++++++++----------
 .../hadoop/hbase/client/RegistryFactory.java    |  3 +-
 .../hadoop/hbase/client/TestAsyncProcess.java   | 31 ++++++-
 3 files changed, 73 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/2e8575bb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index ade32a8..4e64831 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -182,44 +182,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
    */
   ConnectionImplementation(Configuration conf,
                            ExecutorService pool, User user) throws IOException {
-    this(conf);
+    this.conf = conf;
     this.user = user;
     this.batchPool = pool;
-    this.registry = setupRegistry();
-    retrieveClusterId();
-
-    this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
-    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
-
-    // Do we publish the status?
-    boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
-        HConstants.STATUS_PUBLISHED_DEFAULT);
-    Class<? extends ClusterStatusListener.Listener> listenerClass =
-        conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
-            ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
-            ClusterStatusListener.Listener.class);
-    if (shouldListen) {
-      if (listenerClass == null) {
-        LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
-            ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
-      } else {
-        clusterStatusListener = new ClusterStatusListener(
-            new ClusterStatusListener.DeadServerHandler() {
-              @Override
-              public void newDead(ServerName sn) {
-                clearCaches(sn);
-                rpcClient.cancelConnections(sn);
-              }
-            }, conf, listenerClass);
-      }
-    }
-  }
-
-  /**
-   * For tests.
-   */
-  protected ConnectionImplementation(Configuration conf) {
-    this.conf = conf;
     this.tableConfig = new TableConfiguration(conf);
     this.closed = false;
     this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
@@ -239,11 +204,49 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     } else {
       nonceGenerator = new NoNonceGenerator();
     }
-    stats = ServerStatisticTracker.create(conf);
-    this.asyncProcess = createAsyncProcess(this.conf);
+
+    this.stats = ServerStatisticTracker.create(conf);
     this.interceptor = (new RetryingCallerInterceptorFactory(conf)).build();
+    this.rpcControllerFactory = RpcControllerFactory.instantiate(conf);
     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
     this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
+    this.asyncProcess = createAsyncProcess(this.conf);
+
+    boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
+        HConstants.STATUS_PUBLISHED_DEFAULT);
+    Class<? extends ClusterStatusListener.Listener> listenerClass =
+        conf.getClass(ClusterStatusListener.STATUS_LISTENER_CLASS,
+            ClusterStatusListener.DEFAULT_STATUS_LISTENER_CLASS,
+            ClusterStatusListener.Listener.class);
+
+    try {
+      this.registry = setupRegistry();
+      retrieveClusterId();
+
+      this.rpcClient = RpcClientFactory.createClient(this.conf, this.clusterId);
+
+      // Do we publish the status?
+      if (shouldListen) {
+        if (listenerClass == null) {
+          LOG.warn(HConstants.STATUS_PUBLISHED + " is true, but " +
+              ClusterStatusListener.STATUS_LISTENER_CLASS + " is not set - not listening status");
+        } else {
+          clusterStatusListener = new ClusterStatusListener(
+              new ClusterStatusListener.DeadServerHandler() {
+                @Override
+                public void newDead(ServerName sn) {
+                  clearCaches(sn);
+                  rpcClient.cancelConnections(sn);
+                }
+              }, conf, listenerClass);
+        }
+      }
+    } catch (Throwable e) {
+      // avoid leaks: registry, rpcClient, ...
+      LOG.debug("connection construction failed", e);
+      close();
+      throw e;
+    }
   }
 
   /**
@@ -370,7 +373,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
       synchronized (this) {
         if (batchPool == null) {
           this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
-              conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null); 
+              conf.getInt("hbase.hconnection.threads.core", 256), "-shared-", null);
           this.cleanupPool = true;
         }
       }
@@ -478,7 +481,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
 
   protected String clusterId = null;
 
-  void retrieveClusterId() {
+  protected void retrieveClusterId() {
     if (clusterId != null) return;
     this.clusterId = this.registry.getClusterId();
     if (clusterId == null) {
@@ -1979,9 +1982,7 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
   // For tests to override.
   protected AsyncProcess createAsyncProcess(Configuration conf) {
     // No default pool available.
-    return new AsyncProcess(this, conf, this.batchPool,
-        RpcRetryingCallerFactory.instantiate(conf, this.getStatisticsTracker()), false,
-        RpcControllerFactory.instantiate(conf));
+    return new AsyncProcess(this, conf, batchPool, rpcCallerFactory, false, rpcControllerFactory);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e8575bb/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java
index 9adcb6f..8ce1f66 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RegistryFactory.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
  */
 @InterfaceAudience.Private
 final class RegistryFactory {
+  static final String REGISTRY_IMPL_CONF_KEY = "hbase.client.registry.impl";
 
   private RegistryFactory() {}
 
@@ -35,7 +36,7 @@ final class RegistryFactory {
    */
   static Registry getRegistry(final Connection connection)
       throws IOException {
-    String registryClass = connection.getConfiguration().get("hbase.client.registry.impl",
+    String registryClass = connection.getConfiguration().get(REGISTRY_IMPL_CONF_KEY,
       ZooKeeperRegistry.class.getName());
     Registry registry = null;
     try {

http://git-wip-us.apache.org/repos/asf/hbase/blob/2e8575bb/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index fa3ed32..7e7139a 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -342,11 +342,35 @@ public class TestAsyncProcess {
    * Returns our async process.
    */
   static class MyConnectionImpl extends ConnectionImplementation {
+    public static class TestRegistry implements Registry {
+      @Override
+      public void init(Connection connection) {}
+
+      @Override
+      public RegionLocations getMetaRegionLocation() throws IOException {
+        return null;
+      }
+
+      @Override
+      public String getClusterId() {
+        return "testClusterId";
+      }
+
+      @Override
+      public int getCurrentNrHRS() throws IOException {
+        return 1;
+      }
+    }
+
     final AtomicInteger nbThreads = new AtomicInteger(0);
 
+    protected MyConnectionImpl(Configuration conf) throws IOException {
+      super(setupConf(conf), null, null);
+    }
 
-    protected MyConnectionImpl(Configuration conf) {
-      super(conf);
+    private static Configuration setupConf(Configuration conf) {
+      conf.setClass(RegistryFactory.REGISTRY_IMPL_CONF_KEY, TestRegistry.class, Registry.class);
+      return conf;
     }
 
     @Override
@@ -363,7 +387,7 @@ public class TestAsyncProcess {
     List<HRegionLocation> hrl;
     final boolean usedRegions[];
 
-    protected MyConnectionImpl2(List<HRegionLocation> hrl) {
+    protected MyConnectionImpl2(List<HRegionLocation> hrl) throws IOException {
       super(conf);
       this.hrl = hrl;
       this.usedRegions = new boolean[hrl.size()];
@@ -382,7 +406,6 @@ public class TestAsyncProcess {
       }
       return null;
     }
-
   }
 
   @Rule