You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/12/23 04:04:51 UTC

[hbase] 01/02: HBASE-26598 Fix excessive connections in MajorCompactor (#3961)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 8e56696748ac9c818c5500e4d3704124fee8f3cb
Author: Samir Khan <mu...@gmail.com>
AuthorDate: Wed Dec 22 20:29:48 2021 -0600

    HBASE-26598 Fix excessive connections in MajorCompactor (#3961)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../util/compaction/MajorCompactionRequest.java    | 53 ++++++++++------------
 .../util/compaction/MajorCompactionTTLRequest.java | 33 ++++++--------
 .../hbase/util/compaction/MajorCompactor.java      |  6 +--
 .../hbase/util/compaction/MajorCompactorTTL.java   |  4 +-
 .../compaction/TestMajorCompactionRequest.java     | 20 ++++----
 .../compaction/TestMajorCompactionTTLRequest.java  | 18 +++-----
 6 files changed, 57 insertions(+), 77 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
index 22ec6cb..f765b35 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionRequest.java
@@ -22,13 +22,11 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Optional;
 import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.regionserver.HRegionFileSystem;
 import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
@@ -45,26 +43,26 @@ class MajorCompactionRequest {
 
   private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionRequest.class);
 
-  protected final Configuration configuration;
+  protected final Connection connection;
   protected final RegionInfo region;
   private Set<String> stores;
 
-  MajorCompactionRequest(Configuration configuration, RegionInfo region) {
-    this.configuration = configuration;
+  MajorCompactionRequest(Connection connection, RegionInfo region) {
+    this.connection = connection;
     this.region = region;
   }
 
-  MajorCompactionRequest(Configuration configuration, RegionInfo region,
+  MajorCompactionRequest(Connection connection, RegionInfo region,
       Set<String> stores) {
-    this(configuration, region);
+    this(connection, region);
     this.stores = stores;
   }
 
-  static Optional<MajorCompactionRequest> newRequest(Configuration configuration, RegionInfo info,
+  static Optional<MajorCompactionRequest> newRequest(Connection connection, RegionInfo info,
       Set<String> stores, long timestamp) throws IOException {
     MajorCompactionRequest request =
-        new MajorCompactionRequest(configuration, info, stores);
-    return request.createRequest(configuration, stores, timestamp);
+        new MajorCompactionRequest(connection, info, stores);
+    return request.createRequest(connection, stores, timestamp);
   }
 
   RegionInfo getRegion() {
@@ -79,28 +77,26 @@ class MajorCompactionRequest {
     this.stores = stores;
   }
 
-  Optional<MajorCompactionRequest> createRequest(Configuration configuration,
+  Optional<MajorCompactionRequest> createRequest(Connection connection,
       Set<String> stores, long timestamp) throws IOException {
     Set<String> familiesToCompact = getStoresRequiringCompaction(stores, timestamp);
     MajorCompactionRequest request = null;
     if (!familiesToCompact.isEmpty()) {
-      request = new MajorCompactionRequest(configuration, region, familiesToCompact);
+      request = new MajorCompactionRequest(connection, region, familiesToCompact);
     }
     return Optional.ofNullable(request);
   }
 
   Set<String> getStoresRequiringCompaction(Set<String> requestedStores, long timestamp)
       throws IOException {
-    try(Connection connection = getConnection(configuration)) {
-      HRegionFileSystem fileSystem = getFileSystem(connection);
-      Set<String> familiesToCompact = Sets.newHashSet();
-      for (String family : requestedStores) {
-        if (shouldCFBeCompacted(fileSystem, family, timestamp)) {
-          familiesToCompact.add(family);
-        }
+    HRegionFileSystem fileSystem = getFileSystem();
+    Set<String> familiesToCompact = Sets.newHashSet();
+    for (String family : requestedStores) {
+      if (shouldCFBeCompacted(fileSystem, family, timestamp)) {
+        familiesToCompact.add(family);
       }
-      return familiesToCompact;
     }
+    return familiesToCompact;
   }
 
   boolean shouldCFBeCompacted(HRegionFileSystem fileSystem, String family, long ts)
@@ -142,10 +138,6 @@ class MajorCompactionRequest {
     return false;
   }
 
-  Connection getConnection(Configuration configuration) throws IOException {
-    return ConnectionFactory.createConnection(configuration);
-  }
-
   protected boolean familyHasReferenceFile(HRegionFileSystem fileSystem, String family, long ts)
       throws IOException {
     List<Path> referenceFiles =
@@ -167,12 +159,13 @@ class MajorCompactionRequest {
     return FSUtils.getReferenceFilePaths(fileSystem, familyDir);
   }
 
-  HRegionFileSystem getFileSystem(Connection connection) throws IOException {
-    Admin admin = connection.getAdmin();
-    return HRegionFileSystem.openRegionFromFileSystem(admin.getConfiguration(),
-      CommonFSUtils.getCurrentFileSystem(admin.getConfiguration()), CommonFSUtils.getTableDir(
-        CommonFSUtils.getRootDir(admin.getConfiguration()), region.getTable()),
-      region, true);
+  HRegionFileSystem getFileSystem() throws IOException {
+    try (Admin admin = connection.getAdmin()) {
+      return HRegionFileSystem.openRegionFromFileSystem(admin.getConfiguration(),
+        CommonFSUtils.getCurrentFileSystem(admin.getConfiguration()),
+        CommonFSUtils.getTableDir(CommonFSUtils.getRootDir(admin.getConfiguration()),
+          region.getTable()), region, true);
+    }
   }
 
   @Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java
index 0eda459..ecedebe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactionTTLRequest.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
 import org.apache.hadoop.hbase.client.Connection;
@@ -44,40 +43,38 @@ public class MajorCompactionTTLRequest extends MajorCompactionRequest {
 
   private static final Logger LOG = LoggerFactory.getLogger(MajorCompactionTTLRequest.class);
 
-  MajorCompactionTTLRequest(Configuration conf, RegionInfo region) {
-    super(conf, region);
+  MajorCompactionTTLRequest(Connection connection, RegionInfo region) {
+    super(connection, region);
   }
 
-  static Optional<MajorCompactionRequest> newRequest(Configuration conf, RegionInfo info,
+  static Optional<MajorCompactionRequest> newRequest(Connection connection, RegionInfo info,
       TableDescriptor htd) throws IOException {
-    MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(conf, info);
-    return request.createRequest(conf, htd);
+    MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(connection, info);
+    return request.createRequest(connection, htd);
   }
 
-  private Optional<MajorCompactionRequest> createRequest(Configuration conf, TableDescriptor htd)
+  private Optional<MajorCompactionRequest> createRequest(Connection connection, TableDescriptor htd)
       throws IOException {
     Map<String, Long> familiesToCompact = getStoresRequiringCompaction(htd);
     MajorCompactionRequest request = null;
     if (!familiesToCompact.isEmpty()) {
       LOG.debug("Compaction families for region: " + region + " CF: " + familiesToCompact.keySet());
-      request = new MajorCompactionTTLRequest(conf, region);
+      request = new MajorCompactionTTLRequest(connection, region);
     }
     return Optional.ofNullable(request);
   }
 
   Map<String, Long> getStoresRequiringCompaction(TableDescriptor htd) throws IOException {
-    try(Connection connection = getConnection(configuration)) {
-      HRegionFileSystem fileSystem = getFileSystem(connection);
-      Map<String, Long> familyTTLMap = Maps.newHashMap();
-      for (ColumnFamilyDescriptor descriptor : htd.getColumnFamilies()) {
-        long ts = getColFamilyCutoffTime(descriptor);
-        // If the table's TTL is forever, lets not compact any of the regions.
-        if (ts > 0 && shouldCFBeCompacted(fileSystem, descriptor.getNameAsString(), ts)) {
-          familyTTLMap.put(descriptor.getNameAsString(), ts);
-        }
+    HRegionFileSystem fileSystem = getFileSystem();
+    Map<String, Long> familyTTLMap = Maps.newHashMap();
+    for (ColumnFamilyDescriptor descriptor : htd.getColumnFamilies()) {
+      long ts = getColFamilyCutoffTime(descriptor);
+      // If the table's TTL is forever, lets not compact any of the regions.
+      if (ts > 0 && shouldCFBeCompacted(fileSystem, descriptor.getNameAsString(), ts)) {
+        familyTTLMap.put(descriptor.getNameAsString(), ts);
       }
-      return familyTTLMap;
     }
+    return familyTTLMap;
   }
 
   // If the CF has no TTL, return -1, else return the current time - TTL.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
index 370a3e8..d4dc419 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactor.java
@@ -201,8 +201,7 @@ public class MajorCompactor extends Configured implements Tool {
 
   protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri)
       throws IOException {
-    return MajorCompactionRequest.newRequest(connection.getConfiguration(), hri, storesToCompact,
-            timestamp);
+    return MajorCompactionRequest.newRequest(connection, hri, storesToCompact, timestamp);
   }
 
   private Collection<ServerName> getServersToCompact(Set<ServerName> snSet) {
@@ -351,8 +350,7 @@ public class MajorCompactor extends Configured implements Tool {
       for (HRegionLocation location : locations) {
         if (location.getRegion().getRegionId() > timestamp) {
           Optional<MajorCompactionRequest> compactionRequest = MajorCompactionRequest
-              .newRequest(connection.getConfiguration(), location.getRegion(), storesToCompact,
-                  timestamp);
+              .newRequest(connection, location.getRegion(), storesToCompact, timestamp);
           compactionRequest.ifPresent(request -> clusterCompactionQueues
               .addToCompactionQueue(location.getServerName(), request));
         }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java
index 7c92c53..c6ea5af 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/compaction/MajorCompactorTTL.java
@@ -76,7 +76,7 @@ public class MajorCompactorTTL extends MajorCompactor {
   @Override
   protected Optional<MajorCompactionRequest> getMajorCompactionRequest(RegionInfo hri)
       throws IOException {
-    return MajorCompactionTTLRequest.newRequest(connection.getConfiguration(), hri, htd);
+    return MajorCompactionTTLRequest.newRequest(connection, hri, htd);
   }
 
   @Override
@@ -171,4 +171,4 @@ public class MajorCompactorTTL extends MajorCompactor {
   public static void main(String[] args) throws Exception {
     ToolRunner.run(HBaseConfiguration.create(), new MajorCompactorTTL(), args);
   }
-}
\ No newline at end of file
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java
index c125c6e..54f00f9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionRequest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
@@ -36,7 +35,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.commons.lang3.RandomStringUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -81,13 +79,13 @@ public class TestMajorCompactionRequest {
     List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 5, 10);
     MajorCompactionRequest request = makeMockRequest(storeFiles, false);
     Optional<MajorCompactionRequest> result =
-        request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
+        request.createRequest(mock(Connection.class), Sets.newHashSet(FAMILY), 100);
     assertTrue(result.isPresent());
 
     // store files newer than timestamp
     storeFiles = mockStoreFiles(regionStoreDir, 5, 101);
     request = makeMockRequest(storeFiles, false);
-    result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
+    result = request.createRequest(mock(Connection.class), Sets.newHashSet(FAMILY), 100);
     assertFalse(result.isPresent());
   }
 
@@ -100,19 +98,18 @@ public class TestMajorCompactionRequest {
     HRegion region =
         HBaseTestingUtility.createRegionAndWAL(hri, rootRegionDir, UTILITY.getConfiguration(), htd);
 
-    Configuration configuration = mock(Configuration.class);
+    Connection connection = mock(Connection.class);
     // the reference file timestamp is newer
     List<StoreFileInfo> storeFiles = mockStoreFiles(regionStoreDir, 4, 101);
     List<Path> paths = storeFiles.stream().map(StoreFileInfo::getPath).collect(Collectors.toList());
     // the files that are referenced are older, thus we still compact.
     HRegionFileSystem fileSystem =
         mockFileSystem(region.getRegionInfo(), true, storeFiles, 50);
-    MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(configuration,
+    MajorCompactionRequest majorCompactionRequest = spy(new MajorCompactionRequest(connection,
         region.getRegionInfo(), Sets.newHashSet(FAMILY)));
-    doReturn(mock(Connection.class)).when(majorCompactionRequest).getConnection(eq(configuration));
     doReturn(paths).when(majorCompactionRequest).getReferenceFilePaths(any(FileSystem.class),
         any(Path.class));
-    doReturn(fileSystem).when(majorCompactionRequest).getFileSystem(any(Connection.class));
+    doReturn(fileSystem).when(majorCompactionRequest).getFileSystem();
     Set<String> result =
         majorCompactionRequest.getStoresRequiringCompaction(Sets.newHashSet("a"), 100);
     assertEquals(FAMILY, Iterables.getOnlyElement(result));
@@ -158,16 +155,15 @@ public class TestMajorCompactionRequest {
 
   private MajorCompactionRequest makeMockRequest(List<StoreFileInfo> storeFiles,
       boolean references) throws IOException {
-    Configuration configuration = mock(Configuration.class);
+    Connection connection = mock(Connection.class);
     RegionInfo regionInfo = mock(RegionInfo.class);
     when(regionInfo.getEncodedName()).thenReturn("HBase");
     when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
     MajorCompactionRequest request =
-        new MajorCompactionRequest(configuration, regionInfo, Sets.newHashSet("a"));
+        new MajorCompactionRequest(connection, regionInfo, Sets.newHashSet("a"));
     MajorCompactionRequest spy = spy(request);
     HRegionFileSystem fileSystem = mockFileSystem(regionInfo, references, storeFiles);
-    doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
-    doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration));
+    doReturn(fileSystem).when(spy).getFileSystem();
     return spy;
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java
index f15b887..c9dc6f5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/compaction/TestMajorCompactionTTLRequest.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hbase.util.compaction;
 
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
@@ -31,7 +29,6 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.TableName;
@@ -72,29 +69,28 @@ public class TestMajorCompactionTTLRequest extends TestMajorCompactionRequest {
     MajorCompactionTTLRequest request = makeMockRequest(storeFiles);
     // All files are <= 100, so region should not be compacted.
     Optional<MajorCompactionRequest> result =
-        request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 10);
+        request.createRequest(mock(Connection.class), Sets.newHashSet(FAMILY), 10);
     assertFalse(result.isPresent());
 
     // All files are <= 100, so region should not be compacted yet.
-    result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 100);
+    result = request.createRequest(mock(Connection.class), Sets.newHashSet(FAMILY), 100);
     assertFalse(result.isPresent());
 
     // All files are <= 100, so they should be considered for compaction
-    result = request.createRequest(mock(Configuration.class), Sets.newHashSet(FAMILY), 101);
+    result = request.createRequest(mock(Connection.class), Sets.newHashSet(FAMILY), 101);
     assertTrue(result.isPresent());
   }
 
   private MajorCompactionTTLRequest makeMockRequest(List<StoreFileInfo> storeFiles)
       throws IOException {
-    Configuration configuration = mock(Configuration.class);
+    Connection connection = mock(Connection.class);
     RegionInfo regionInfo = mock(RegionInfo.class);
     when(regionInfo.getEncodedName()).thenReturn("HBase");
     when(regionInfo.getTable()).thenReturn(TableName.valueOf("foo"));
-    MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(configuration, regionInfo);
+    MajorCompactionTTLRequest request = new MajorCompactionTTLRequest(connection, regionInfo);
     MajorCompactionTTLRequest spy = spy(request);
     HRegionFileSystem fileSystem = mockFileSystem(regionInfo, false, storeFiles);
-    doReturn(fileSystem).when(spy).getFileSystem(isA(Connection.class));
-    doReturn(mock(Connection.class)).when(spy).getConnection(eq(configuration));
+    doReturn(fileSystem).when(spy).getFileSystem();
     return spy;
   }
-}
\ No newline at end of file
+}