You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ha...@apache.org on 2021/11/23 07:53:26 UTC
[hbase] 02/02: HBASE-26249 Ameliorate compaction made by bulk-loading files (#3831) (#3874)
This is an automated email from the ASF dual-hosted git repository.
haxiaolin pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit f8d03d122769b98d896b3a461c9dbb6a03976a05
Author: Xiaolin Ha <ha...@apache.org>
AuthorDate: Tue Nov 23 15:45:21 2021 +0800
HBASE-26249 Ameliorate compaction made by bulk-loading files (#3831) (#3874)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../hadoop/hbase/regionserver/CompactSplit.java | 70 ++++++++++++--
.../apache/hadoop/hbase/regionserver/HRegion.java | 12 ++-
.../compactions/CompactionRequester.java | 6 ++
.../regionserver/TestCompactionAfterBulkLoad.java | 101 +++++++++++++++++----
4 files changed, 157 insertions(+), 32 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index d40a882..bb8c9b8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -27,7 +27,9 @@ import java.io.StringWriter;
import java.util.Comparator;
import java.util.Iterator;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
@@ -92,6 +94,7 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
private volatile ThreadPoolExecutor splits;
private volatile ThroughputController compactionThroughputController;
+ private volatile Set<String> underCompactionStores = ConcurrentHashMap.newKeySet();
private volatile boolean compactionsEnabled;
/**
@@ -113,6 +116,15 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
CompactionThroughputControllerFactory.create(server, conf);
}
+ // only for test
+ public CompactSplit(Configuration conf) {
+ this.server = null;
+ this.conf = conf;
+ this.compactionsEnabled = this.conf.getBoolean(HBASE_REGION_SERVER_ENABLE_COMPACTION,true);
+ createCompactionExecutors();
+ createSplitExcecutors();
+ }
+
private void createSplitExcecutors() {
final String n = Thread.currentThread().getName();
int splitThreads = conf.getInt(SPLIT_THREADS, SPLIT_THREADS_DEFAULT);
@@ -238,7 +250,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
createCompactionExecutors();
}
- private interface CompactionCompleteTracker {
+ // set protected for test
+ protected interface CompactionCompleteTracker {
default void completed(Store store) {
}
@@ -316,7 +329,8 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
}
}
- private void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
+ // set protected for test
+ protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
boolean selectNow, CompactionLifeCycleTracker tracker,
CompactionCompleteTracker completeTracker, User user) throws IOException {
if (this.server.isStopped() || (region.getTableDescriptor() != null &&
@@ -364,6 +378,12 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
}
pool.execute(
new CompactionRunner(store, region, compaction, tracker, completeTracker, pool, user));
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Add compact mark for store {}, priority={}, current under compaction "
+ + "store size is {}", getStoreNameForUnderCompaction(store), priority,
+ underCompactionStores.size());
+ }
+ underCompactionStores.add(getStoreNameForUnderCompaction(store));
region.incrementCompactionsQueuedCount();
if (LOG.isDebugEnabled()) {
String type = (pool == shortCompactions) ? "Small " : "Large ";
@@ -377,8 +397,18 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
DUMMY_COMPLETE_TRACKER, null);
}
- public synchronized void requestSystemCompaction(HRegion region, HStore store, String why)
+ public void requestSystemCompaction(HRegion region, HStore store, String why)
throws IOException {
+ requestSystemCompaction(region, store, why, false);
+ }
+
+ public synchronized void requestSystemCompaction(HRegion region, HStore store, String why,
+ boolean giveUpIfRequestedOrCompacting) throws IOException {
+ if (giveUpIfRequestedOrCompacting && isUnderCompaction(store)) {
+ LOG.debug("Region {} store {} is under compaction now, skip to request compaction", region,
+ store.getColumnFamilyName());
+ return;
+ }
requestCompactionInternal(region, store, why, NO_PRIORITY, false,
CompactionLifeCycleTracker.DUMMY, DUMMY_COMPLETE_TRACKER, null);
}
@@ -471,6 +501,13 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
return this.regionSplitLimit;
}
+ /**
+ * Check if this store is under compaction
+ */
+ public boolean isUnderCompaction(final HStore s) {
+ return underCompactionStores.contains(getStoreNameForUnderCompaction(s));
+ }
+
private static final Comparator<Runnable> COMPARATOR =
new Comparator<Runnable>() {
@@ -650,13 +687,22 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
@Override
public void run() {
- Preconditions.checkNotNull(server);
- if (server.isStopped() || (region.getTableDescriptor() != null &&
- !region.getTableDescriptor().isCompactionEnabled())) {
- region.decrementCompactionsQueuedCount();
- return;
+ try {
+ Preconditions.checkNotNull(server);
+ if (server.isStopped() || (region.getTableDescriptor() != null &&
+ !region.getTableDescriptor().isCompactionEnabled())) {
+ region.decrementCompactionsQueuedCount();
+ return;
+ }
+ doCompaction(user);
+ } finally {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Remove under compaction mark for store: {}",
+ store.getHRegion().getRegionInfo().getEncodedName() + ":" + store
+ .getColumnFamilyName());
+ }
+ underCompactionStores.remove(getStoreNameForUnderCompaction(store));
}
- doCompaction(user);
}
private String formatStackTrace(Exception ex) {
@@ -824,4 +870,10 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
return shortCompactions;
}
+ private String getStoreNameForUnderCompaction(HStore store) {
+ return String.format("%s:%s",
+ store.getHRegion() != null ? store.getHRegion().getRegionInfo().getEncodedName() : "",
+ store.getColumnFamilyName());
+ }
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index e412c47..3db87ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
import edu.umd.cs.findbugs.annotations.Nullable;
import io.opentelemetry.api.trace.Span;
import java.io.EOFException;
@@ -72,6 +73,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -196,6 +198,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -7046,11 +7049,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
HStore store = getStore(family);
try {
if (this.rsServices != null && store.needsCompaction()) {
- this.rsServices.getCompactionRequestor().requestCompaction(this, store,
- "bulkload hfiles request compaction", Store.PRIORITY_USER + 1,
- CompactionLifeCycleTracker.DUMMY, null);
- LOG.debug("bulkload hfiles request compaction region : {}, family : {}",
- this.getRegionInfo(), family);
+ this.rsServices.getCompactionRequestor().requestSystemCompaction(this, store,
+ "bulkload hfiles request compaction", true);
+ LOG.info("Request compaction for region {} family {} after bulk load",
+ this.getRegionInfo().getEncodedName(), store.getColumnFamilyName());
}
} catch (IOException e) {
LOG.error("bulkload hfiles request compaction error ", e);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
index e5f5360..31a7ca7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequester.java
@@ -45,6 +45,12 @@ public interface CompactionRequester {
CompactionLifeCycleTracker tracker, @Nullable User user) throws IOException;
/**
+ * Request system compaction on the given store.
+ */
+ void requestSystemCompaction(HRegion region, HStore store, String why,
+ boolean giveUpIfRequestedOrCompacting) throws IOException;
+
+ /**
* on/off compaction
*/
void switchCompaction(boolean onOrOff);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java
index c736513..b17995a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactionAfterBulkLoad.java
@@ -18,49 +18,49 @@
package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.regionserver.HRegion.COMPACTION_AFTER_BULKLOAD_ENABLE;
+
+import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.hamcrest.MockitoHamcrest.argThat;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequester;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@Category(SmallTests.class)
public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
- private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
- private final CompactionRequester compactionRequester = mock(CompactSplit.class);
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestCompactionAfterBulkLoad.class);
+ private final RegionServerServices regionServerServices = mock(RegionServerServices.class);
+ public static AtomicInteger called = new AtomicInteger(0);
+
@Override
protected HRegion testRegionWithFamiliesAndSpecifiedTableName(TableName tableName,
byte[]... families) throws IOException {
@@ -79,7 +79,9 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
}
@Test
- public void shouldRequestCompactionAfterBulkLoad() throws IOException {
+ public void shouldRequestCompactAllStoresAfterBulkLoad() throws IOException {
+ final CompactSplit compactSplit = new TestCompactSplit(HBaseConfiguration.create());
+ called.set(0);
List<Pair<byte[], String>> familyPaths = new ArrayList<>();
// enough hfile to request compaction
for (int i = 0; i < 5; i++) {
@@ -88,7 +90,7 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
try {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
when(regionServerServices.getConfiguration()).thenReturn(conf);
- when(regionServerServices.getCompactionRequestor()).thenReturn(compactionRequester);
+ when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit);
when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
.thenAnswer(new Answer() {
@Override
@@ -103,14 +105,77 @@ public class TestCompactionAfterBulkLoad extends TestBulkloadBase {
}
});
- Mockito.doNothing().when(compactionRequester).requestCompaction(any(), any(), any(), anyInt(),
- any(), any());
- testRegionWithFamilies(family1, family2, family3).bulkLoadHFiles(familyPaths, false, null);
- // invoke three times for 3 families
- verify(compactionRequester, times(3)).requestCompaction(isA(HRegion.class), isA(HStore.class),
- isA(String.class), anyInt(), eq(CompactionLifeCycleTracker.DUMMY), eq(null));
+ HRegion region = testRegionWithFamilies(family1, family2, family3);
+ region.bulkLoadHFiles(familyPaths, false, null);
+ assertEquals(3, called.get());
} finally {
conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
}
}
+
+ @Test
+ public void testAvoidRepeatedlyRequestCompactAfterBulkLoad() throws IOException {
+ final CompactSplit compactSplit = new TestFamily1UnderCompact(HBaseConfiguration.create());
+ called.set(0);
+ List<Pair<byte[], String>> familyPaths = new ArrayList<>();
+ // enough hfile to request compaction
+ for (int i = 0; i < 5; i++) {
+ familyPaths.addAll(withFamilyPathsFor(family1, family2, family3));
+ }
+ try {
+ conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, true);
+ when(regionServerServices.getConfiguration()).thenReturn(conf);
+ when(regionServerServices.getCompactionRequestor()).thenReturn(compactSplit);
+ when(log.appendMarker(any(), any(), argThat(bulkLogWalEditType(WALEdit.BULK_LOAD))))
+ .thenAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) {
+ WALKeyImpl walKey = invocation.getArgument(1);
+ MultiVersionConcurrencyControl mvcc = walKey.getMvcc();
+ if (mvcc != null) {
+ MultiVersionConcurrencyControl.WriteEntry we = mvcc.begin();
+ walKey.setWriteEntry(we);
+ }
+ return 01L;
+ }
+ });
+
+ HRegion region = testRegionWithFamilies(family1, family2, family3);
+ region.bulkLoadHFiles(familyPaths, false, null);
+ // invoke three times for 2 families
+ assertEquals(2, called.get());
+ } finally {
+ conf.setBoolean(COMPACTION_AFTER_BULKLOAD_ENABLE, false);
+ }
+ }
+
+ private class TestCompactSplit extends CompactSplit {
+
+ TestCompactSplit(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ protected void requestCompactionInternal(HRegion region, HStore store, String why, int priority,
+ boolean selectNow, CompactionLifeCycleTracker tracker,
+ CompactionCompleteTracker completeTracker, User user) throws IOException {
+ called.addAndGet(1);
+ }
+ }
+
+ private class TestFamily1UnderCompact extends TestCompactSplit {
+
+ TestFamily1UnderCompact(Configuration conf) {
+ super(conf);
+ }
+
+ @Override
+ public boolean isUnderCompaction(final HStore s) {
+ if (s.getColumnFamilyName().equals(Bytes.toString(family1))) {
+ return true;
+ }
+ return super.isUnderCompaction(s);
+ }
+ }
+
}