You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ha...@apache.org on 2021/11/23 03:16:46 UTC

[hbase] 01/02: Revert "HBASE-26249 Ameliorate compaction made by bul… (#3831)"

This is an automated email from the ASF dual-hosted git repository.

haxiaolin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 2aafd31ff58e24a25689b11899ef5d94399e69cf
Author: haxiaolin <ha...@apache.org>
AuthorDate: Tue Nov 23 11:15:14 2021 +0800

    Revert "HBASE-26249 Ameliorate compaction made by bul… (#3831)"
    
    This reverts commit 5e62e2aa8d6617ff0608fff5a2f57caae0623606.
---
 .../hadoop/hbase/regionserver/CompactSplit.java    |  70 ++------------
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  12 +--
 .../compactions/CompactionRequester.java           |   6 --
 .../regionserver/TestCompactionAfterBulkLoad.java  | 101 ++++-----------------
 4 files changed, 32 insertions(+), 157 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index bb8c9b8..441b18b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -27,9 +27,7 @@ import java.io.StringWriter;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
@@ -94,7 +92,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
   private volatile ThreadPoolExecutor splits;
 
   private volatile ThroughputController compactionThroughputController;
-  private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet();
 
   private volatile boolean compactionsEnabled;
   /**
@@ -116,15 +113,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
         CompactionThroughputControllerFactory.create(server, conf);
   }
 
-  // only for test
-  public CompactSplit(Configuration conf) {
-    this.server = null;
-    this.conf = conf;
-    this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true);
-    createCompactionExecutors();
-    createSplitExcecutors();
-  }
-
   private void createSplitExcecutors() {
     final String n = Thread.currentThread().getName();
     int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
@@ -250,8 +238,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
     createCompactionExecutors();
   }
 
-  // set protected for test
-  protected interface CompactionCompleteTracker {
+  private interface CompactionCompleteTracker {
 
     default void completed(Store store) {
     }
@@ -329,8 +316,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
     }
   }
 
-  // set protected for test
-  protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
+  private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
       boolean selectNow, CompactionLifeCycleTracker tracker,
       CompactionCompleteTracker completeTracker, User user) throws IOException {
     if (this.server.isStopped() || (region.getTableDescriptor() != null &&
@@ -378,12 +364,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
     }
     pool.execute(
       new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Add compact mark for store {}, priority={}, current under compaction "
-          + "store size is {}", getStoreNameForUnderCompaction(store), priority,
-        underCompactionStores.size());
-    }
-    underCompactionStores.add(getStoreNameForUnderCompaction(store));
     region.incrementCompactionsQueuedCount();
     if (LOG.isDebugEnabled()) {
       String type = (pool == shortCompactions) ? "Small " : "Large ";
@@ -397,18 +377,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
       DUMMY_COMPLETE_TRACKER, null);
   }
 
-  public void requestSystemCompaction(HRegion region, HStore store, String why)
+  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
       throws IOException {
-    requestSystemCompaction(region, store, why, false);
-  }
-
-  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why,
-      boolean giveUpIfRequestedOrCompacting) throws IOException {
-    if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) {
-      LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region,
-        store.getColumnFamilyName());
-      return;
-    }
     requestCompactionInternal(region, store, why, NO_PRIORITY, false,
       CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
   }
@@ -501,13 +471,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
     return this.regionSplitLimit;
   }
 
-  /**
-   * Check if this store is under compaction
-   */
-  public boolean isUnderCompaction(final HStore s) {
-    return underCompactionStores.contains(getStoreNameForUnderCompaction(s));
-  }
-
   private static final Comparator<Runnable> COMPARATOR =
       new Comparator<Runnable>() {
 
@@ -687,22 +650,13 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
 
     @Override
     public void run() {
-      try {
-        Preconditions.checkNotNull(server);
-        if (server.isStopped() || (region.getTableDescriptor() != null &&
-            !region.getTableDescriptor().isCompactionEnabled())) {
-          region.decrementCompactionsQueuedCount();
-          return;
-        }
-        doCompaction(user);
-      } finally {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Remove under compaction mark for store: {}",
-            store.getHRegion().getRegionInfo().getEncodedName() + ":" + store
-              .getColumnFamilyName());
-        }
-        underCompactionStores.remove(getStoreNameForUnderCompaction(store));
+      Preconditions.checkNotNull(server);
+      if (server.isStopped() || (region.getTableDescriptor() != null &&
+        !region.getTableDescriptor().isCompactionEnabled())) {
+        region.decrementCompactionsQueuedCount();
+        return;
       }
+      doCompaction(user);
     }
 
     private String formatStackTrace(Exception ex) {
@@ -870,10 +824,4 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
     return shortCompactions;
   }
 
-  private String getStoreNameForUnderCompaction(HStore store) {
-    return String.format("%s:%s",
-      store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "",
-      store.getColumnFamilyName());
-  }
-
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index b534f68..dd9720c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
-
 import edu.umd.cs.findbugs.annotations.Nullable;
 import io.opentelemetry.api.trace.Span;
 import java.io.EOFException;
@@ -69,7 +68,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -179,7 +177,6 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -7053,10 +7050,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           HStore store = getStore(family);
           try {
             if (this.rsServices != null && store.needsCompaction()) {
-              this.rsServices.getCompactionRequestor().requestSystemCompaction(this, store,
-                  "bulkload hfiles request compaction", true);
-              LOG.info("Request compaction for region {} family {} after bulk load",
-                  this.getRegionInfo().getEncodedName(), store.getColumnFamilyName());
+              this.rsServices.getCompactionRequestor().requestCompaction(this, store,
+                "bulkload hfiles request compaction", Store.PRIORITY_USER + 1,
+                CompactionLifeCycleTracker.DUMMY, null);
+              LOG.debug("bulkload hfiles request compaction region : {}, family : {}",
+                this.getRegionInfo(), family);
             }
           } catch (IOException e) {
             LOG.error("bulkload hfiles request compaction error ", e);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
index 31a7ca7..e5f5360 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
@@ -45,12 +45,6 @@ public interface CompactionRequester {
       CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException;
 
   /**
-   * Request system compaction on the given store.
-   */
-  void requestSystemCompaction(HRegion region, HStore store, String why,
-      boolean giveUpIfRequestedOrCompacting) throws IOException;
-
-  /**
    * on/off compaction
    */
   void switchCompaction(boolean onOrOff);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java
index b17995a..c736513 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java
@@ -18,49 +18,49 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE;
-
-import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.hamcrest.MockitoHamcrest.argThat;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
-import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 @Category(SmallTests.class)
 public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
+  private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
+  private final CompactionRequester compactionRequester = mock(CompactSplit.class);
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestCompactionAfterBulkLoad.class);
 
-  private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
-  public static AtomicInteger called = new AtomicInteger(0);
-
   @Override
   protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
       byte[]... families) throws IOException {
@@ -79,9 +79,7 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
   }
 
   @Test
-  public void shouldRequestCompactAllStoresAfterBulkLoad() throws IOException {
-    final CompactSplit compactSplit = new TestCompactSplit(HBaseConfiguration.create());
-    called.set(0);
+  public void shouldRequestCompactionAfterBulkLoad() throws IOException {
     List<Pair<byte[], String>> familyPaths = new ArrayList<>();
     // enough hfile to request compaction
     for (int i = 0; i < 5; i++) {
@@ -90,7 +88,7 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
     try {
       conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
       when(regionServerServices.getConfiguration()).thenReturn(conf);
-      when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit);
+      when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester);
       when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
           .thenAnswer(new Answer() {
             @Override
@@ -105,77 +103,14 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
             }
           });
 
-      HRegion region = testRegionWithFamilies(family1, family2, family3);
-      region.bulkLoadHFiles(familyPaths, false, null);
-      assertEquals(3, called.get());
+      Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(),
+        any(), any());
+      testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null);
+      // invoke three times for 3 families
+      verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class),
+        isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null));
     } finally {
       conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
     }
   }
-
-  @Test
-  public void testAvoidRepeatedlyRequestCompactAfterBulkLoad() throws IOException {
-    final CompactSplit compactSplit = new TestFamily1UnderCompact(HBaseConfiguration.create());
-    called.set(0);
-    List<Pair<byte[], String>> familyPaths = new ArrayList<>();
-    // enough hfile to request compaction
-    for (int i = 0; i < 5; i++) {
-      familyPaths.addAll(withFamilyPathsFor(family1, family2, family3));
-    }
-    try {
-      conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
-      when(regionServerServices.getConfiguration()).thenReturn(conf);
-      when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit);
-      when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
-        .thenAnswer(new Answer() {
-          @Override
-          public Object answer(InvocationOnMock invocation) {
-            WALKeyImpl walKey = invocation.getArgument(1);
-            MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
-            if (mvcc != null) {
-              MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
-              walKey.setWriteEntry(we);
-            }
-            return 01L;
-          }
-        });
-
-      HRegion region = testRegionWithFamilies(family1, family2, family3);
-      region.bulkLoadHFiles(familyPaths, false, null);
-      // invoke three times for 2 families
-      assertEquals(2, called.get());
-    } finally {
-      conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
-    }
-  }
-
-  private class TestCompactSplit extends CompactSplit {
-
-    TestCompactSplit(Configuration conf) {
-      super(conf);
-    }
-
-    @Override
-    protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
-      boolean selectNow, CompactionLifeCycleTracker tracker,
-      CompactionCompleteTracker completeTracker, User user) throws IOException {
-      called.addAndGet(1);
-    }
-  }
-
-  private class TestFamily1UnderCompact extends TestCompactSplit {
-
-    TestFamily1UnderCompact(Configuration conf) {
-      super(conf);
-    }
-
-    @Override
-    public boolean isUnderCompaction(final HStore s) {
-      if (s.getColumnFamilyName().equals(Bytes.toString(family1))) {
-        return true;
-      }
-      return super.isUnderCompaction(s);
-    }
-  }
-
 }