You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ha...@apache.org on 2021/11/23 07:53:24 UTC

[hbase] branch branch-2 updated (5843a9a -> f8d03d1)

This is an automated email from the ASF dual-hosted git repository.

haxiaolin pushed a change to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git.


    from 5843a9a  Backport HBASE-26249 Ameliorate compaction made by bulk-loading files (#3831) (#3874)
     new 6140625  Revert "Backport HBASE-26249 Ameliorate compaction made by bulk-loading files (#3831) (#3874)"
     new f8d03d1  HBASE-26249 Ameliorate compaction made by bulk-loading files (#3831) (#3874)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:

[hbase] 01/02: Revert "Backport HBASE-26249 Ameliorate compaction made by bulk-loading files (#3831) (#3874)"

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haxiaolin pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 61406252a50689844c0d832752e021691b0f4e78
Author: haxiaolin <ha...@apache.org>
AuthorDate: Tue Nov 23 15:52:34 2021 +0800

    Revert "Backport HBASE-26249 Ameliorate compaction made by bulk-loading files (#3831) (#3874)"
    
    This reverts commit 5843a9a44c3e662675f0f2003235123480b4be4b.
---
 .../hadoop/hbase/regionserver/CompactSplit.java    |  70 ++------------
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  12 +--
 .../compactions/CompactionRequester.java           |   6 --
 .../regionserver/TestCompactionAfterBulkLoad.java  | 101 ++++-----------------
 4 files changed, 32 insertions(+), 157 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index bb8c9b8..d40a882 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -27,9 +27,7 @@ import java.io.StringWriter;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Optional;
-import java.util.Set;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
@@ -94,7 +92,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
   private volatile ThreadPoolExecutor splits;
 
   private volatile ThroughputController compactionThroughputController;
-  private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet();
 
   private volatile boolean compactionsEnabled;
   /**
@@ -116,15 +113,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
         CompactionThroughputControllerFactory.create(server, conf);
   }
 
-  // only for test
-  public CompactSplit(Configuration conf) {
-    this.server = null;
-    this.conf = conf;
-    this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true);
-    createCompactionExecutors();
-    createSplitExcecutors();
-  }
-
   private void createSplitExcecutors() {
     final String n = Thread.currentThread().getName();
     int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
@@ -250,8 +238,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
     createCompactionExecutors();
   }
 
-  // set protected for test
-  protected interface CompactionCompleteTracker {
+  private interface CompactionCompleteTracker {
 
     default void completed(Store store) {
     }
@@ -329,8 +316,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
     }
   }
 
-  // set protected for test
-  protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
+  private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
       boolean selectNow, CompactionLifeCycleTracker tracker,
       CompactionCompleteTracker completeTracker, User user) throws IOException {
     if (this.server.isStopped() || (region.getTableDescriptor() != null &&
@@ -378,12 +364,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
     }
     pool.execute(
       new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Add compact mark for store {}, priority={}, current under compaction "
-          + "store size is {}", getStoreNameForUnderCompaction(store), priority,
-        underCompactionStores.size());
-    }
-    underCompactionStores.add(getStoreNameForUnderCompaction(store));
     region.incrementCompactionsQueuedCount();
     if (LOG.isDebugEnabled()) {
       String type = (pool == shortCompactions) ? "Small " : "Large ";
@@ -397,18 +377,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
       DUMMY_COMPLETE_TRACKER, null);
   }
 
-  public void requestSystemCompaction(HRegion region, HStore store, String why)
+  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
       throws IOException {
-    requestSystemCompaction(region, store, why, false);
-  }
-
-  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why,
-      boolean giveUpIfRequestedOrCompacting) throws IOException {
-    if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) {
-      LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region,
-        store.getColumnFamilyName());
-      return;
-    }
     requestCompactionInternal(region, store, why, NO_PRIORITY, false,
       CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
   }
@@ -501,13 +471,6 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
     return this.regionSplitLimit;
   }
 
-  /**
-   * Check if this store is under compaction
-   */
-  public boolean isUnderCompaction(final HStore s) {
-    return underCompactionStores.contains(getStoreNameForUnderCompaction(s));
-  }
-
   private static final Comparator<Runnable> COMPARATOR =
       new Comparator<Runnable>() {
 
@@ -687,22 +650,13 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
 
     @Override
     public void run() {
-      try {
-        Preconditions.checkNotNull(server);
-        if (server.isStopped() || (region.getTableDescriptor() != null &&
-            !region.getTableDescriptor().isCompactionEnabled())) {
-          region.decrementCompactionsQueuedCount();
-          return;
-        }
-        doCompaction(user);
-      } finally {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Remove under compaction mark for store: {}",
-            store.getHRegion().getRegionInfo().getEncodedName() + ":" + store
-              .getColumnFamilyName());
-        }
-        underCompactionStores.remove(getStoreNameForUnderCompaction(store));
+      Preconditions.checkNotNull(server);
+      if (server.isStopped() || (region.getTableDescriptor() != null &&
+          !region.getTableDescriptor().isCompactionEnabled())) {
+        region.decrementCompactionsQueuedCount();
+        return;
       }
+      doCompaction(user);
     }
 
     private String formatStackTrace(Exception ex) {
@@ -870,10 +824,4 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
     return shortCompactions;
   }
 
-  private String getStoreNameForUnderCompaction(HStore store) {
-    return String.format("%s:%s",
-      store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "",
-      store.getColumnFamilyName());
-  }
-
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 3db87ec..e412c47 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.regionserver;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
-
 import edu.umd.cs.findbugs.annotations.Nullable;
 import io.opentelemetry.api.trace.Span;
 import java.io.EOFException;
@@ -73,7 +72,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -198,7 +196,6 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -7049,10 +7046,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           HStore store = getStore(family);
           try {
             if (this.rsServices != null && store.needsCompaction()) {
-              this.rsServices.getCompactionRequestor().requestSystemCompaction(this, store,
-                  "bulkload hfiles request compaction", true);
-              LOG.info("Request compaction for region {} family {} after bulk load",
-                  this.getRegionInfo().getEncodedName(), store.getColumnFamilyName());
+              this.rsServices.getCompactionRequestor().requestCompaction(this, store,
+                "bulkload hfiles request compaction", Store.PRIORITY_USER + 1,
+                CompactionLifeCycleTracker.DUMMY, null);
+              LOG.debug("bulkload hfiles request compaction region : {}, family : {}",
+                this.getRegionInfo(), family);
             }
           } catch (IOException e) {
             LOG.error("bulkload hfiles request compaction error ", e);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
index 31a7ca7..e5f5360 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
@@ -45,12 +45,6 @@ public interface CompactionRequester {
       CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException;
 
   /**
-   * Request system compaction on the given store.
-   */
-  void requestSystemCompaction(HRegion region, HStore store, String why,
-      boolean giveUpIfRequestedOrCompacting) throws IOException;
-
-  /**
    * on/off compaction
    */
   void switchCompaction(boolean onOrOff);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java
index b17995a..c736513 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java
@@ -18,49 +18,49 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE;
-
-import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.hamcrest.MockitoHamcrest.argThat;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
-import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 @Category(SmallTests.class)
 public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
+  private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
+  private final CompactionRequester compactionRequester = mock(CompactSplit.class);
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestCompactionAfterBulkLoad.class);
 
-  private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
-  public static AtomicInteger called = new AtomicInteger(0);
-
   @Override
   protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
       byte[]... families) throws IOException {
@@ -79,9 +79,7 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
   }
 
   @Test
-  public void shouldRequestCompactAllStoresAfterBulkLoad() throws IOException {
-    final CompactSplit compactSplit = new TestCompactSplit(HBaseConfiguration.create());
-    called.set(0);
+  public void shouldRequestCompactionAfterBulkLoad() throws IOException {
     List<Pair<byte[], String>> familyPaths = new ArrayList<>();
     // enough hfile to request compaction
     for (int i = 0; i < 5; i++) {
@@ -90,7 +88,7 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
     try {
       conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
       when(regionServerServices.getConfiguration()).thenReturn(conf);
-      when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit);
+      when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester);
       when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
           .thenAnswer(new Answer() {
             @Override
@@ -105,77 +103,14 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
             }
           });
 
-      HRegion region = testRegionWithFamilies(family1, family2, family3);
-      region.bulkLoadHFiles(familyPaths, false, null);
-      assertEquals(3, called.get());
+      Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(),
+        any(), any());
+      testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null);
+      // invoke three times for 3 families
+      verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class),
+        isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null));
     } finally {
       conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
     }
   }
-
-  @Test
-  public void testAvoidRepeatedlyRequestCompactAfterBulkLoad() throws IOException {
-    final CompactSplit compactSplit = new TestFamily1UnderCompact(HBaseConfiguration.create());
-    called.set(0);
-    List<Pair<byte[], String>> familyPaths = new ArrayList<>();
-    // enough hfile to request compaction
-    for (int i = 0; i < 5; i++) {
-      familyPaths.addAll(withFamilyPathsFor(family1, family2, family3));
-    }
-    try {
-      conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
-      when(regionServerServices.getConfiguration()).thenReturn(conf);
-      when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit);
-      when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
-        .thenAnswer(new Answer() {
-          @Override
-          public Object answer(InvocationOnMock invocation) {
-            WALKeyImpl walKey = invocation.getArgument(1);
-            MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
-            if (mvcc != null) {
-              MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
-              walKey.setWriteEntry(we);
-            }
-            return 01L;
-          }
-        });
-
-      HRegion region = testRegionWithFamilies(family1, family2, family3);
-      region.bulkLoadHFiles(familyPaths, false, null);
-      // invoke three times for 2 families
-      assertEquals(2, called.get());
-    } finally {
-      conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
-    }
-  }
-
-  private class TestCompactSplit extends CompactSplit {
-
-    TestCompactSplit(Configuration conf) {
-      super(conf);
-    }
-
-    @Override
-    protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
-      boolean selectNow, CompactionLifeCycleTracker tracker,
-      CompactionCompleteTracker completeTracker, User user) throws IOException {
-      called.addAndGet(1);
-    }
-  }
-
-  private class TestFamily1UnderCompact extends TestCompactSplit {
-
-    TestFamily1UnderCompact(Configuration conf) {
-      super(conf);
-    }
-
-    @Override
-    public boolean isUnderCompaction(final HStore s) {
-      if (s.getColumnFamilyName().equals(Bytes.toString(family1))) {
-        return true;
-      }
-      return super.isUnderCompaction(s);
-    }
-  }
-
 }

[hbase] 02/02: HBASE-26249 Ameliorate compaction made by bulk-loading files (#3831) (#3874)

Posted by ha...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

haxiaolin pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit f8d03d122769b98d896b3a461c9dbb6a03976a05
Author: Xiaolin Ha <ha...@apache.org>
AuthorDate: Tue Nov 23 15:45:21 2021 +0800

    HBASE-26249 Ameliorate compaction made by bulk-loading files (#3831) (#3874)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../hadoop/hbase/regionserver/CompactSplit.java    |  70 ++++++++++++--
 .../apache/hadoop/hbase/regionserver/HRegion.java  |  12 ++-
 .../compactions/CompactionRequester.java           |   6 ++
 .../regionserver/TestCompactionAfterBulkLoad.java  | 101 +++++++++++++++++----
 4 files changed, 157 insertions(+), 32 deletions(-)

diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index d40a882..bb8c9b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -27,7 +27,9 @@ import java.io.StringWriter;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.RejectedExecutionHandler;
@@ -92,6 +94,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
   private volatile ThreadPoolExecutor splits;
 
   private volatile ThroughputController compactionThroughputController;
+  private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet();
 
   private volatile boolean compactionsEnabled;
   /**
@@ -113,6 +116,15 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
         CompactionThroughputControllerFactory.create(server, conf);
   }
 
+  // only for test
+  public CompactSplit(Configuration conf) {
+    this.server = null;
+    this.conf = conf;
+    this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true);
+    createCompactionExecutors();
+    createSplitExcecutors();
+  }
+
   private void createSplitExcecutors() {
     final String n = Thread.currentThread().getName();
     int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
@@ -238,7 +250,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
     createCompactionExecutors();
   }
 
-  private interface CompactionCompleteTracker {
+  // set protected for test
+  protected interface CompactionCompleteTracker {
 
     default void completed(Store store) {
     }
@@ -316,7 +329,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
     }
   }
 
-  private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
+  // set protected for test
+  protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
       boolean selectNow, CompactionLifeCycleTracker tracker,
       CompactionCompleteTracker completeTracker, User user) throws IOException {
     if (this.server.isStopped() || (region.getTableDescriptor() != null &&
@@ -364,6 +378,12 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
     }
     pool.execute(
       new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Add compact mark for store {}, priority={}, current under compaction "
+          + "store size is {}", getStoreNameForUnderCompaction(store), priority,
+        underCompactionStores.size());
+    }
+    underCompactionStores.add(getStoreNameForUnderCompaction(store));
     region.incrementCompactionsQueuedCount();
     if (LOG.isDebugEnabled()) {
       String type = (pool == shortCompactions) ? "Small " : "Large ";
@@ -377,8 +397,18 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
       DUMMY_COMPLETE_TRACKER, null);
   }
 
-  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
+  public void requestSystemCompaction(HRegion region, HStore store, String why)
       throws IOException {
+    requestSystemCompaction(region, store, why, false);
+  }
+
+  public synchronized void requestSystemCompaction(HRegion region, HStore store, String why,
+      boolean giveUpIfRequestedOrCompacting) throws IOException {
+    if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) {
+      LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region,
+        store.getColumnFamilyName());
+      return;
+    }
     requestCompactionInternal(region, store, why, NO_PRIORITY, false,
       CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
   }
@@ -471,6 +501,13 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
     return this.regionSplitLimit;
   }
 
+  /**
+   * Check if this store is under compaction
+   */
+  public boolean isUnderCompaction(final HStore s) {
+    return underCompactionStores.contains(getStoreNameForUnderCompaction(s));
+  }
+
   private static final Comparator<Runnable> COMPARATOR =
       new Comparator<Runnable>() {
 
@@ -650,13 +687,22 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
 
     @Override
     public void run() {
-      Preconditions.checkNotNull(server);
-      if (server.isStopped() || (region.getTableDescriptor() != null &&
-          !region.getTableDescriptor().isCompactionEnabled())) {
-        region.decrementCompactionsQueuedCount();
-        return;
+      try {
+        Preconditions.checkNotNull(server);
+        if (server.isStopped() || (region.getTableDescriptor() != null &&
+            !region.getTableDescriptor().isCompactionEnabled())) {
+          region.decrementCompactionsQueuedCount();
+          return;
+        }
+        doCompaction(user);
+      } finally {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Remove under compaction mark for store: {}",
+            store.getHRegion().getRegionInfo().getEncodedName() + ":" + store
+              .getColumnFamilyName());
+        }
+        underCompactionStores.remove(getStoreNameForUnderCompaction(store));
       }
-      doCompaction(user);
     }
 
     private String formatStackTrace(Exception ex) {
@@ -824,4 +870,10 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
     return shortCompactions;
   }
 
+  private String getStoreNameForUnderCompaction(HStore store) {
+    return String.format("%s:%s",
+      store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "",
+      store.getColumnFamilyName());
+  }
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index e412c47..3db87ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
 import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
 import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
 import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
 import edu.umd.cs.findbugs.annotations.Nullable;
 import io.opentelemetry.api.trace.Span;
 import java.io.EOFException;
@@ -72,6 +73,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -196,6 +198,7 @@ import org.apache.hadoop.util.StringUtils;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
 import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
 import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -7046,11 +7049,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           HStore store = getStore(family);
           try {
             if (this.rsServices != null && store.needsCompaction()) {
-              this.rsServices.getCompactionRequestor().requestCompaction(this, store,
-                "bulkload hfiles request compaction", Store.PRIORITY_USER + 1,
-                CompactionLifeCycleTracker.DUMMY, null);
-              LOG.debug("bulkload hfiles request compaction region : {}, family : {}",
-                this.getRegionInfo(), family);
+              this.rsServices.getCompactionRequestor().requestSystemCompaction(this, store,
+                  "bulkload hfiles request compaction", true);
+              LOG.info("Request compaction for region {} family {} after bulk load",
+                  this.getRegionInfo().getEncodedName(), store.getColumnFamilyName());
             }
           } catch (IOException e) {
             LOG.error("bulkload hfiles request compaction error ", e);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
index e5f5360..31a7ca7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
@@ -45,6 +45,12 @@ public interface CompactionRequester {
       CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException;
 
   /**
+   * Request system compaction on the given store.
+   */
+  void requestSystemCompaction(HRegion region, HStore store, String why,
+      boolean giveUpIfRequestedOrCompacting) throws IOException;
+
+  /**
    * on/off compaction
    */
   void switchCompaction(boolean onOrOff);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java
index c736513..b17995a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java
@@ -18,49 +18,49 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE;
+
+import static org.junit.Assert.assertEquals;
 import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
 import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.mockito.hamcrest.MockitoHamcrest.argThat;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.RegionInfoBuilder;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
+import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKeyImpl;
 import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
 @Category(SmallTests.class)
 public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
-  private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
-  private final CompactionRequester compactionRequester = mock(CompactSplit.class);
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
     HBaseClassTestRule.forClass(TestCompactionAfterBulkLoad.class);
 
+  private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
+  public static AtomicInteger called = new AtomicInteger(0);
+
   @Override
   protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
       byte[]... families) throws IOException {
@@ -79,7 +79,9 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
   }
 
   @Test
-  public void shouldRequestCompactionAfterBulkLoad() throws IOException {
+  public void shouldRequestCompactAllStoresAfterBulkLoad() throws IOException {
+    final CompactSplit compactSplit = new TestCompactSplit(HBaseConfiguration.create());
+    called.set(0);
     List<Pair<byte[], String>> familyPaths = new ArrayList<>();
     // enough hfile to request compaction
     for (int i = 0; i < 5; i++) {
@@ -88,7 +90,7 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
     try {
       conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
       when(regionServerServices.getConfiguration()).thenReturn(conf);
-      when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester);
+      when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit);
       when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
           .thenAnswer(new Answer() {
             @Override
@@ -103,14 +105,77 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
             }
           });
 
-      Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(),
-        any(), any());
-      testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null);
-      // invoke three times for 3 families
-      verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class),
-        isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null));
+      HRegion region = testRegionWithFamilies(family1, family2, family3);
+      region.bulkLoadHFiles(familyPaths, false, null);
+      assertEquals(3, called.get());
     } finally {
       conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
     }
   }
+
+  @Test
+  public void testAvoidRepeatedlyRequestCompactAfterBulkLoad() throws IOException {
+    final CompactSplit compactSplit = new TestFamily1UnderCompact(HBaseConfiguration.create());
+    called.set(0);
+    List<Pair<byte[], String>> familyPaths = new ArrayList<>();
+    // enough hfile to request compaction
+    for (int i = 0; i < 5; i++) {
+      familyPaths.addAll(withFamilyPathsFor(family1, family2, family3));
+    }
+    try {
+      conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
+      when(regionServerServices.getConfiguration()).thenReturn(conf);
+      when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit);
+      when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
+        .thenAnswer(new Answer() {
+          @Override
+          public Object answer(InvocationOnMock invocation) {
+            WALKeyImpl walKey = invocation.getArgument(1);
+            MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
+            if (mvcc != null) {
+              MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
+              walKey.setWriteEntry(we);
+            }
+            return 01L;
+          }
+        });
+
+      HRegion region = testRegionWithFamilies(family1, family2, family3);
+      region.bulkLoadHFiles(familyPaths, false, null);
+      // invoke three times for 2 families
+      assertEquals(2, called.get());
+    } finally {
+      conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
+    }
+  }
+
+  private class TestCompactSplit extends CompactSplit {
+
+    TestCompactSplit(Configuration conf) {
+      super(conf);
+    }
+
+    @Override
+    protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
+      boolean selectNow, CompactionLifeCycleTracker tracker,
+      CompactionCompleteTracker completeTracker, User user) throws IOException {
+      called.addAndGet(1);
+    }
+  }
+
+  private class TestFamily1UnderCompact extends TestCompactSplit {
+
+    TestFamily1UnderCompact(Configuration conf) {
+      super(conf);
+    }
+
+    @Override
+    public boolean isUnderCompaction(final HStore s) {
+      if (s.getColumnFamilyName().equals(Bytes.toString(family1))) {
+        return true;
+      }
+      return super.isUnderCompaction(s);
+    }
+  }
+
 }