You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bh...@apache.org on 2020/07/30 17:44:59 UTC

[hbase] branch branch-2.2 updated: HBASE-24788: Fix the connection leaks on getting hbase admin from unclosed connection (#2173)

This is an automated email from the ASF dual-hosted git repository.

bharathv pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new a1ce200  HBASE-24788: Fix the connection leaks on getting hbase admin from unclosed connection (#2173)
a1ce200 is described below

commit a1ce200522ef90a05faa64d13210c734635c59cf
Author: Sandeep Pal <50...@users.noreply.github.com>
AuthorDate: Thu Jul 30 10:32:32 2020 -0700

    HBASE-24788: Fix the connection leaks on getting hbase admin from unclosed connection (#2173)
    
    Signed-off-by: Ted Yu <ty...@apache.org>
    Signed-off-by: Viraj Jasani <vj...@apache.org>
    Signed-off-by: Bharath Vissapragada <bh...@apache.org>
    (cherry picked from commit 6c61c50df7f8b6fccf4c0e7690b3a2f8299ea01d)
---
 .../hadoop/hbase/mapreduce/TableOutputFormat.java  | 17 +++++-----
 .../hadoop/hbase/util/ServerRegionReplicaUtil.java | 26 ++++++++-------
 .../TestAsyncReplicationAdminApiWithClusters.java  | 14 ++++++--
 .../regionserver/TestClearRegionBlockCache.java    | 37 ++++++++++++----------
 .../hbase/replication/TestMasterReplication.java   | 17 +++++-----
 5 files changed, 63 insertions(+), 48 deletions(-)

diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index 78be5af..52f3402 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -19,9 +19,6 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -41,6 +38,9 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
 import org.apache.hadoop.mapreduce.OutputFormat;
 import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
@@ -155,11 +155,10 @@ implements Configurable {
    * @param context  The current task context.
    * @return The newly created writer instance.
    * @throws IOException When creating the writer fails.
-   * @throws InterruptedException When the jobs is cancelled.
    */
   @Override
   public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
-  throws IOException, InterruptedException {
+    throws IOException {
     return new TableRecordWriter();
   }
 
@@ -168,18 +167,18 @@ implements Configurable {
    *
    * @param context  The current context.
    * @throws IOException When the check fails.
-   * @throws InterruptedException When the job is aborted.
    * @see OutputFormat#checkOutputSpecs(JobContext)
    */
   @Override
-  public void checkOutputSpecs(JobContext context) throws IOException,
-      InterruptedException {
+  public void checkOutputSpecs(JobContext context)
+    throws IOException {
     Configuration hConf = getConf();
     if (hConf == null) {
       hConf = context.getConfiguration();
     }
 
-    try (Admin admin = ConnectionFactory.createConnection(hConf).getAdmin()) {
+    try (Connection connection = ConnectionFactory.createConnection(hConf);
+      Admin admin = connection.getAdmin()) {
       TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE));
       if (!admin.tableExists(tableName)) {
         throw new TableNotFoundException("Can't write, table does not exist:" +
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 0609733..0661a4d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
 import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -161,25 +162,26 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
     if (!isRegionReplicaReplicationEnabled(conf)) {
       return;
     }
-    Admin admin = ConnectionFactory.createConnection(conf).getAdmin();
-    ReplicationPeerConfig peerConfig = null;
-    try {
-      peerConfig = admin.getReplicationPeerConfig(REGION_REPLICA_REPLICATION_PEER);
-    } catch (ReplicationPeerNotFoundException e) {
-      LOG.warn("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER
-          + " not exist", e);
-    }
-    try {
+
+    try (Connection connection = ConnectionFactory.createConnection(conf);
+      Admin admin = connection.getAdmin()) {
+      ReplicationPeerConfig peerConfig = null;
+      try {
+        peerConfig = admin.getReplicationPeerConfig(REGION_REPLICA_REPLICATION_PEER);
+      } catch (ReplicationPeerNotFoundException e) {
+        LOG.warn(
+          "Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER + " not exist",
+          e);
+      }
+
       if (peerConfig == null) {
         LOG.info("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER
-            + " not exist. Creating...");
+          + " not exist. Creating...");
         peerConfig = new ReplicationPeerConfig();
         peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf));
         peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName());
         admin.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig);
       }
-    } finally {
-      admin.close();
     }
   }
 
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
index b22caa2..8e60aad 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -64,6 +66,7 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase
   private static HBaseTestingUtility TEST_UTIL2;
   private static Configuration conf2;
   private static AsyncAdmin admin2;
+  private static AsyncConnection connection;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -78,14 +81,21 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase
     conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
     TEST_UTIL2 = new HBaseTestingUtility(conf2);
     TEST_UTIL2.startMiniCluster();
-    admin2 =
-        ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin();
+
+    connection =
+      ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get();
+    admin2 = connection.getAdmin();
 
     ReplicationPeerConfig rpc = new ReplicationPeerConfig();
     rpc.setClusterKey(TEST_UTIL2.getClusterKey());
     ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join();
   }
 
+  @AfterClass
+  public static void clearUp() throws IOException {
+    connection.close();
+  }
+
   @Override
   @After
   public void tearDown() throws Exception {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClearRegionBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClearRegionBlockCache.java
index 3c8e8be..f8bc331 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClearRegionBlockCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClearRegionBlockCache.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.AsyncAdmin;
+import org.apache.hadoop.hbase.client.AsyncConnection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.Table;
@@ -147,27 +148,29 @@ public class TestClearRegionBlockCache {
 
   @Test
   public void testClearBlockCacheFromAsyncAdmin() throws Exception {
-    AsyncAdmin admin =
-        ConnectionFactory.createAsyncConnection(HTU.getConfiguration()).get().getAdmin();
-
-    BlockCache blockCache1 = rs1.getBlockCache().get();
-    BlockCache blockCache2 = rs2.getBlockCache().get();
-    long initialBlockCount1 = blockCache1.getBlockCount();
-    long initialBlockCount2 = blockCache2.getBlockCount();
-
-    // scan will cause blocks to be added in BlockCache
-    scanAllRegionsForRS(rs1);
-    assertEquals(blockCache1.getBlockCount() - initialBlockCount1,
+    try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(HTU.getConfiguration())
+      .get()) {
+      AsyncAdmin admin = conn.getAdmin();
+
+      BlockCache blockCache1 = rs1.getBlockCache().get();
+      BlockCache blockCache2 = rs2.getBlockCache().get();
+      long initialBlockCount1 = blockCache1.getBlockCount();
+      long initialBlockCount2 = blockCache2.getBlockCount();
+
+      // scan will cause blocks to be added in BlockCache
+      scanAllRegionsForRS(rs1);
+      assertEquals(blockCache1.getBlockCount() - initialBlockCount1,
         HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY));
-    scanAllRegionsForRS(rs2);
-    assertEquals(blockCache2.getBlockCount() - initialBlockCount2,
+      scanAllRegionsForRS(rs2);
+      assertEquals(blockCache2.getBlockCount() - initialBlockCount2,
         HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
 
-    CacheEvictionStats stats = admin.clearBlockCache(TABLE_NAME).get();
-    assertEquals(stats.getEvictedBlocks(), HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY) + HTU
+      CacheEvictionStats stats = admin.clearBlockCache(TABLE_NAME).get();
+      assertEquals(stats.getEvictedBlocks(), HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY) + HTU
         .getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
-    assertEquals(initialBlockCount1, blockCache1.getBlockCount());
-    assertEquals(initialBlockCount2, blockCache2.getBlockCount());
+      assertEquals(initialBlockCount1, blockCache1.getBlockCount());
+      assertEquals(initialBlockCount2, blockCache2.getBlockCount());
+    }
   }
 
   private void scanAllRegionsForRS(HRegionServer rs) throws IOException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 37ca7dc..e680fd5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -516,8 +517,8 @@ public class TestMasterReplication {
 
   private void addPeer(String id, int masterClusterNumber,
       int slaveClusterNumber) throws Exception {
-    try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
-        .getAdmin()) {
+    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
+      Admin admin = conn.getAdmin()) {
       admin.addReplicationPeer(id,
         new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()));
     }
@@ -525,8 +526,8 @@ public class TestMasterReplication {
 
   private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
       throws Exception {
-    try (Admin admin =
-        ConnectionFactory.createConnection(configurations[masterClusterNumber]).getAdmin()) {
+    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
+      Admin admin = conn.getAdmin()) {
       admin.addReplicationPeer(
         id,
         new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
@@ -536,15 +537,15 @@ public class TestMasterReplication {
   }
 
   private void disablePeer(String id, int masterClusterNumber) throws Exception {
-    try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
-        .getAdmin()) {
+    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
+      Admin admin = conn.getAdmin()) {
       admin.disableReplicationPeer(id);
     }
   }
 
   private void enablePeer(String id, int masterClusterNumber) throws Exception {
-    try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
-        .getAdmin()) {
+    try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
+      Admin admin = conn.getAdmin()) {
       admin.enableReplicationPeer(id);
     }
   }