You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/03/03 00:28:37 UTC

[hbase] branch master updated: HBASE-21978 Should close AsyncRegistry if we fail to get cluster id when creating AsyncConnection

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e12e88  HBASE-21978 Should close AsyncRegistry if we fail to get cluster id when creating AsyncConnection
1e12e88 is described below

commit 1e12e884925f09abbc8c1231c7e000e875e961c6
Author: zhangduo <zh...@apache.org>
AuthorDate: Sat Mar 2 15:57:35 2019 +0800

    HBASE-21978 Should close AsyncRegistry if we fail to get cluster id when creating AsyncConnection
    
    Signed-off-by: Guanghao Zhang <zg...@apache.org>
---
 .../hadoop/hbase/client/ConnectionFactory.java     |  4 +-
 .../hadoop/hbase/client/TestAsyncRegistryLeak.java | 90 ++++++++++++++++++++++
 2 files changed, 93 insertions(+), 1 deletion(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
index e3e87f6..b36485f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionFactory.java
@@ -277,7 +277,6 @@ public class ConnectionFactory {
    * @param conf configuration
    * @param user the user the asynchronous connection is for
    * @return AsyncConnection object wrapped by CompletableFuture
-   * @throws IOException
    */
   public static CompletableFuture<AsyncConnection> createAsyncConnection(Configuration conf,
       final User user) {
@@ -285,10 +284,12 @@ public class ConnectionFactory {
     AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
     addListener(registry.getClusterId(), (clusterId, error) -> {
       if (error != null) {
+        registry.close();
         future.completeExceptionally(error);
         return;
       }
       if (clusterId == null) {
+        registry.close();
         future.completeExceptionally(new IOException("clusterid came back null"));
         return;
       }
@@ -299,6 +300,7 @@ public class ConnectionFactory {
           user.runAs((PrivilegedExceptionAction<? extends AsyncConnection>) () -> ReflectionUtils
             .newInstance(clazz, conf, registry, clusterId, user)));
       } catch (Exception e) {
+        registry.close();
         future.completeExceptionally(e);
       }
     });
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegistryLeak.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegistryLeak.java
new file mode 100644
index 0000000..7c21075
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncRegistryLeak.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ClientTests.class, SmallTests.class })
+public class TestAsyncRegistryLeak {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncRegistryLeak.class);
+
+  public static final class AsyncRegistryForTest extends DoNothingAsyncRegistry {
+
+    private boolean closed = false;
+
+    public AsyncRegistryForTest(Configuration conf) {
+      super(conf);
+      CREATED.add(this);
+    }
+
+    @Override
+    public CompletableFuture<String> getClusterId() {
+      return FutureUtils.failedFuture(new IOException("inject error"));
+    }
+
+    @Override
+    public void close() {
+      closed = true;
+    }
+  }
+
+  private static final List<AsyncRegistryForTest> CREATED = new ArrayList<>();
+
+  private static Configuration CONF = HBaseConfiguration.create();
+
+  @BeforeClass
+  public static void setUp() {
+    CONF.setClass(AsyncRegistryFactory.REGISTRY_IMPL_CONF_KEY, AsyncRegistryForTest.class,
+      AsyncRegistry.class);
+  }
+
+  @Test
+  public void test() throws InterruptedException {
+    for (int i = 0; i < 10; i++) {
+      try {
+        ConnectionFactory.createAsyncConnection(CONF).get();
+        fail();
+      } catch (ExecutionException e) {
+        // expected
+      }
+    }
+    assertEquals(10, CREATED.size());
+    CREATED.forEach(r -> assertTrue(r.closed));
+  }
+}