You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by bh...@apache.org on 2020/07/30 17:44:59 UTC
[hbase] branch branch-2.2 updated: HBASE-24788: Fix the connection
leaks on getting hbase admin from unclosed connection (#2173)
This is an automated email from the ASF dual-hosted git repository.
bharathv pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push:
new a1ce200 HBASE-24788: Fix the connection leaks on getting hbase admin from unclosed connection (#2173)
a1ce200 is described below
commit a1ce200522ef90a05faa64d13210c734635c59cf
Author: Sandeep Pal <50...@users.noreply.github.com>
AuthorDate: Thu Jul 30 10:32:32 2020 -0700
HBASE-24788: Fix the connection leaks on getting hbase admin from unclosed connection (#2173)
Signed-off-by: Ted Yu <ty...@apache.org>
Signed-off-by: Viraj Jasani <vj...@apache.org>
Signed-off-by: Bharath Vissapragada <bh...@apache.org>
(cherry picked from commit 6c61c50df7f8b6fccf4c0e7690b3a2f8299ea01d)
---
.../hadoop/hbase/mapreduce/TableOutputFormat.java | 17 +++++-----
.../hadoop/hbase/util/ServerRegionReplicaUtil.java | 26 ++++++++-------
.../TestAsyncReplicationAdminApiWithClusters.java | 14 ++++++--
.../regionserver/TestClearRegionBlockCache.java | 37 ++++++++++++----------
.../hbase/replication/TestMasterReplication.java | 17 +++++-----
5 files changed, 63 insertions(+), 48 deletions(-)
diff --git a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
index 78be5af..52f3402 100644
--- a/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
+++ b/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableOutputFormat.java
@@ -19,9 +19,6 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
@@ -41,6 +38,9 @@ import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Convert Map/Reduce output and write it to an HBase table. The KEY is ignored
@@ -155,11 +155,10 @@ implements Configurable {
* @param context The current task context.
* @return The newly created writer instance.
* @throws IOException When creating the writer fails.
- * @throws InterruptedException When the jobs is cancelled.
*/
@Override
public RecordWriter<KEY, Mutation> getRecordWriter(TaskAttemptContext context)
- throws IOException, InterruptedException {
+ throws IOException {
return new TableRecordWriter();
}
@@ -168,18 +167,18 @@ implements Configurable {
*
* @param context The current context.
* @throws IOException When the check fails.
- * @throws InterruptedException When the job is aborted.
* @see OutputFormat#checkOutputSpecs(JobContext)
*/
@Override
- public void checkOutputSpecs(JobContext context) throws IOException,
- InterruptedException {
+ public void checkOutputSpecs(JobContext context)
+ throws IOException {
Configuration hConf = getConf();
if (hConf == null) {
hConf = context.getConfiguration();
}
- try (Admin admin = ConnectionFactory.createConnection(hConf).getAdmin()) {
+ try (Connection connection = ConnectionFactory.createConnection(hConf);
+ Admin admin = connection.getAdmin()) {
TableName tableName = TableName.valueOf(hConf.get(OUTPUT_TABLE));
if (!admin.tableExists(tableName)) {
throw new TableNotFoundException("Can't write, table does not exist:" +
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 0609733..0661a4d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
@@ -161,25 +162,26 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
if (!isRegionReplicaReplicationEnabled(conf)) {
return;
}
- Admin admin = ConnectionFactory.createConnection(conf).getAdmin();
- ReplicationPeerConfig peerConfig = null;
- try {
- peerConfig = admin.getReplicationPeerConfig(REGION_REPLICA_REPLICATION_PEER);
- } catch (ReplicationPeerNotFoundException e) {
- LOG.warn("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER
- + " not exist", e);
- }
- try {
+
+ try (Connection connection = ConnectionFactory.createConnection(conf);
+ Admin admin = connection.getAdmin()) {
+ ReplicationPeerConfig peerConfig = null;
+ try {
+ peerConfig = admin.getReplicationPeerConfig(REGION_REPLICA_REPLICATION_PEER);
+ } catch (ReplicationPeerNotFoundException e) {
+ LOG.warn(
+ "Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER + " not exist",
+ e);
+ }
+
if (peerConfig == null) {
LOG.info("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER
- + " not exist. Creating...");
+ + " not exist. Creating...");
peerConfig = new ReplicationPeerConfig();
peerConfig.setClusterKey(ZKConfig.getZooKeeperClusterKey(conf));
peerConfig.setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName());
admin.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig);
}
- } finally {
- admin.close();
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
index b22caa2..8e60aad 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApiWithClusters.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.After;
+import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -64,6 +66,7 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase
private static HBaseTestingUtility TEST_UTIL2;
private static Configuration conf2;
private static AsyncAdmin admin2;
+ private static AsyncConnection connection;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -78,14 +81,21 @@ public class TestAsyncReplicationAdminApiWithClusters extends TestAsyncAdminBase
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
TEST_UTIL2 = new HBaseTestingUtility(conf2);
TEST_UTIL2.startMiniCluster();
- admin2 =
- ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get().getAdmin();
+
+ connection =
+ ConnectionFactory.createAsyncConnection(TEST_UTIL2.getConfiguration()).get();
+ admin2 = connection.getAdmin();
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(TEST_UTIL2.getClusterKey());
ASYNC_CONN.getAdmin().addReplicationPeer(ID_SECOND, rpc).join();
}
+ @AfterClass
+ public static void clearUp() throws IOException {
+ connection.close();
+ }
+
@Override
@After
public void tearDown() throws Exception {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClearRegionBlockCache.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClearRegionBlockCache.java
index 3c8e8be..f8bc331 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClearRegionBlockCache.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestClearRegionBlockCache.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.AsyncAdmin;
+import org.apache.hadoop.hbase.client.AsyncConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
@@ -147,27 +148,29 @@ public class TestClearRegionBlockCache {
@Test
public void testClearBlockCacheFromAsyncAdmin() throws Exception {
- AsyncAdmin admin =
- ConnectionFactory.createAsyncConnection(HTU.getConfiguration()).get().getAdmin();
-
- BlockCache blockCache1 = rs1.getBlockCache().get();
- BlockCache blockCache2 = rs2.getBlockCache().get();
- long initialBlockCount1 = blockCache1.getBlockCount();
- long initialBlockCount2 = blockCache2.getBlockCount();
-
- // scan will cause blocks to be added in BlockCache
- scanAllRegionsForRS(rs1);
- assertEquals(blockCache1.getBlockCount() - initialBlockCount1,
+ try (AsyncConnection conn = ConnectionFactory.createAsyncConnection(HTU.getConfiguration())
+ .get()) {
+ AsyncAdmin admin = conn.getAdmin();
+
+ BlockCache blockCache1 = rs1.getBlockCache().get();
+ BlockCache blockCache2 = rs2.getBlockCache().get();
+ long initialBlockCount1 = blockCache1.getBlockCount();
+ long initialBlockCount2 = blockCache2.getBlockCount();
+
+ // scan will cause blocks to be added in BlockCache
+ scanAllRegionsForRS(rs1);
+ assertEquals(blockCache1.getBlockCount() - initialBlockCount1,
HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY));
- scanAllRegionsForRS(rs2);
- assertEquals(blockCache2.getBlockCount() - initialBlockCount2,
+ scanAllRegionsForRS(rs2);
+ assertEquals(blockCache2.getBlockCount() - initialBlockCount2,
HTU.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
- CacheEvictionStats stats = admin.clearBlockCache(TABLE_NAME).get();
- assertEquals(stats.getEvictedBlocks(), HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY) + HTU
+ CacheEvictionStats stats = admin.clearBlockCache(TABLE_NAME).get();
+ assertEquals(stats.getEvictedBlocks(), HTU.getNumHFilesForRS(rs1, TABLE_NAME, FAMILY) + HTU
.getNumHFilesForRS(rs2, TABLE_NAME, FAMILY));
- assertEquals(initialBlockCount1, blockCache1.getBlockCount());
- assertEquals(initialBlockCount2, blockCache2.getBlockCount());
+ assertEquals(initialBlockCount1, blockCache1.getBlockCount());
+ assertEquals(initialBlockCount2, blockCache2.getBlockCount());
+ }
}
private void scanAllRegionsForRS(HRegionServer rs) throws IOException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 37ca7dc..e680fd5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -516,8 +517,8 @@ public class TestMasterReplication {
private void addPeer(String id, int masterClusterNumber,
int slaveClusterNumber) throws Exception {
- try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
- .getAdmin()) {
+ try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
+ Admin admin = conn.getAdmin()) {
admin.addReplicationPeer(id,
new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey()));
}
@@ -525,8 +526,8 @@ public class TestMasterReplication {
private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
throws Exception {
- try (Admin admin =
- ConnectionFactory.createConnection(configurations[masterClusterNumber]).getAdmin()) {
+ try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
+ Admin admin = conn.getAdmin()) {
admin.addReplicationPeer(
id,
new ReplicationPeerConfig().setClusterKey(utilities[slaveClusterNumber].getClusterKey())
@@ -536,15 +537,15 @@ public class TestMasterReplication {
}
private void disablePeer(String id, int masterClusterNumber) throws Exception {
- try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
- .getAdmin()) {
+ try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
+ Admin admin = conn.getAdmin()) {
admin.disableReplicationPeer(id);
}
}
private void enablePeer(String id, int masterClusterNumber) throws Exception {
- try (Admin admin = ConnectionFactory.createConnection(configurations[masterClusterNumber])
- .getAdmin()) {
+ try (Connection conn = ConnectionFactory.createConnection(configurations[masterClusterNumber]);
+ Admin admin = conn.getAdmin()) {
admin.enableReplicationPeer(id);
}
}