You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by vj...@apache.org on 2020/05/27 15:29:13 UTC
[hbase] branch branch-2 updated: HBASE-24428 : Update compaction
priority for recently split daughter regions (#1784)
This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 1e86ff0 HBASE-24428 : Update compaction priority for recently split daughter regions (#1784)
1e86ff0 is described below
commit 1e86ff09d7f5d68276ee33e3ef3d3150d38bc834
Author: Viraj Jasani <vj...@apache.org>
AuthorDate: Wed May 27 20:56:41 2020 +0530
HBASE-24428 : Update compaction priority for recently split daughter regions (#1784)
Signed-off-by: Andrew Purtell <ap...@apache.org>
---
.../apache/hadoop/hbase/regionserver/HStore.java | 23 +++++-
.../hadoop/hbase/regionserver/StoreUtils.java | 2 +-
.../compactions/CompactionRequestImpl.java | 13 +++
.../compactions/SortedCompactionPolicy.java | 1 +
.../compactions/StripeCompactionPolicy.java | 1 +
.../TestSplitTransactionOnCluster.java | 94 ++++++++++++++++++++--
6 files changed, 126 insertions(+), 8 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index f61d679..f3a1222 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -143,6 +143,12 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
public static final int DEFAULT_COMPACTCHECKER_INTERVAL_MULTIPLIER = 1000;
public static final int DEFAULT_BLOCKING_STOREFILE_COUNT = 16;
+ // HBASE-24428 : Update compaction priority for recently split daughter regions
+ // so as to prioritize their compaction.
+ // Any compaction candidate with higher priority than compaction of newly split daugher regions
+ // should have priority value < (Integer.MIN_VALUE + 1000)
+ private static final int SPLIT_REGION_COMPACTION_PRIORITY = Integer.MIN_VALUE + 1000;
+
private static final Logger LOG = LoggerFactory.getLogger(HStore.class);
protected final MemStore memstore;
@@ -1937,7 +1943,22 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
// Set common request properties.
// Set priority, either override value supplied by caller or from store.
- request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());
+ final int compactionPriority =
+ (priority != Store.NO_PRIORITY) ? priority : getCompactPriority();
+ request.setPriority(compactionPriority);
+
+ if (request.isAfterSplit()) {
+ // If the store belongs to recently splitted daughter regions, better we consider
+ // them with the higher priority in the compaction queue.
+ // Override priority if it is lower (higher int value) than
+ // SPLIT_REGION_COMPACTION_PRIORITY
+ final int splitHousekeepingPriority =
+ Math.min(compactionPriority, SPLIT_REGION_COMPACTION_PRIORITY);
+ request.setPriority(splitHousekeepingPriority);
+ LOG.info("Keeping/Overriding Compaction request priority to {} for CF {} since it"
+ + " belongs to recently split daughter region {}", splitHousekeepingPriority,
+ this.getColumnFamilyName(), getRegionInfo().getRegionNameAsString());
+ }
request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
request.setTracker(tracker);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
index dd17729..0e4f6c2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
@@ -52,7 +52,7 @@ public class StoreUtils {
*/
public static boolean hasReferences(Collection<HStoreFile> files) {
// TODO: make sure that we won't pass null here in the future.
- return files != null ? files.stream().anyMatch(HStoreFile::isReference) : false;
+ return files != null && files.stream().anyMatch(HStoreFile::isReference);
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java
index 4ea8e3f..899219d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequestImpl.java
@@ -43,6 +43,7 @@ public class CompactionRequestImpl implements CompactionRequest {
private DisplayCompactionType isMajor = DisplayCompactionType.MINOR;
private int priority = NO_PRIORITY;
private Collection<HStoreFile> filesToCompact;
+ private boolean isAfterSplit = false;
// CompactRequest object creation time.
private long selectionTime;
@@ -136,6 +137,14 @@ public class CompactionRequestImpl implements CompactionRequest {
return tracker;
}
+ public boolean isAfterSplit() {
+ return isAfterSplit;
+ }
+
+ public void setAfterSplit(boolean afterSplit) {
+ isAfterSplit = afterSplit;
+ }
+
@Override
public int hashCode() {
final int prime = 31;
@@ -149,6 +158,7 @@ public class CompactionRequestImpl implements CompactionRequest {
result = prime * result + ((storeName == null) ? 0 : storeName.hashCode());
result = prime * result + (int) (totalSize ^ (totalSize >>> 32));
result = prime * result + ((tracker == null) ? 0 : tracker.hashCode());
+ result = prime * result + (isAfterSplit ? 1231 : 1237);
return result;
}
@@ -200,6 +210,9 @@ public class CompactionRequestImpl implements CompactionRequest {
if (totalSize != other.totalSize) {
return false;
}
+ if (isAfterSplit != other.isAfterSplit) {
+ return false;
+ }
if (tracker == null) {
if (other.tracker != null) {
return false;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java
index 9b30ab5..ef9f3ca 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/SortedCompactionPolicy.java
@@ -84,6 +84,7 @@ public abstract class SortedCompactionPolicy extends CompactionPolicy {
CompactionRequestImpl result = createCompactionRequest(candidateSelection,
isTryingMajor || isAfterSplit, mayUseOffPeak, mayBeStuck);
+ result.setAfterSplit(isAfterSplit);
ArrayList<HStoreFile> filesToCompact = Lists.newArrayList(result.getFiles());
removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
index 756faa9..443075c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
@@ -122,6 +122,7 @@ public class StripeCompactionPolicy extends CompactionPolicy {
SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
request.setMajorRangeFull();
+ request.getRequest().setAfterSplit(true);
return request;
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
index 45f35ef..0ffd607 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertNull;
@@ -26,6 +27,8 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.lang.reflect.Field;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -74,6 +77,7 @@ import org.apache.hadoop.hbase.coprocessor.MasterCoprocessor;
import org.apache.hadoop.hbase.coprocessor.MasterCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.MasterObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.master.LoadBalancer;
import org.apache.hadoop.hbase.master.MasterRpcServices;
@@ -85,6 +89,7 @@ import org.apache.hadoop.hbase.master.assignment.RegionStateNode;
import org.apache.hadoop.hbase.master.assignment.RegionStates;
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionLifeCycleTracker;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.throttle.NoLimitThroughputController;
import org.apache.hadoop.hbase.testclassification.LargeTests;
@@ -108,6 +113,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
+import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -278,6 +284,79 @@ public class TestSplitTransactionOnCluster {
assertEquals(2, cluster.getRegions(tableName).size());
}
+ @Test
+ public void testSplitCompactWithPriority() throws Exception {
+ final TableName tableName = TableName.valueOf(name.getMethodName());
+ // Create table then get the single region for our new table.
+ byte[] cf = Bytes.toBytes("cf");
+ TableDescriptor htd = TableDescriptorBuilder.newBuilder(tableName)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of(cf)).build();
+ admin.createTable(htd);
+
+ assertNotEquals("Unable to retrieve regions of the table", -1,
+ TESTING_UTIL.waitFor(10000, () -> cluster.getRegions(tableName).size() == 1));
+
+ HRegion region = cluster.getRegions(tableName).get(0);
+ HStore store = region.getStore(cf);
+ int regionServerIndex = cluster.getServerWith(region.getRegionInfo().getRegionName());
+ HRegionServer regionServer = cluster.getRegionServer(regionServerIndex);
+
+ Table table = TESTING_UTIL.getConnection().getTable(tableName);
+ // insert data
+ insertData(tableName, admin, table);
+ insertData(tableName, admin, table, 20);
+ insertData(tableName, admin, table, 40);
+
+ // Compaction Request
+ store.triggerMajorCompaction();
+ Optional<CompactionContext> compactionContext = store.requestCompaction();
+ assertTrue(compactionContext.isPresent());
+ assertFalse(compactionContext.get().getRequest().isAfterSplit());
+ assertEquals(compactionContext.get().getRequest().getPriority(), 13);
+
+ // Split
+ long procId =
+ cluster.getMaster().splitRegion(region.getRegionInfo(), Bytes.toBytes("row4"), 0, 0);
+
+ // wait for the split to complete or get interrupted. If the split completes successfully,
+ // the procedure will return true; if the split fails, the procedure would throw exception.
+ ProcedureTestingUtility.waitProcedure(cluster.getMaster().getMasterProcedureExecutor(),
+ procId);
+
+ assertEquals(2, cluster.getRegions(tableName).size());
+ // we have 2 daughter regions
+ HRegion hRegion1 = cluster.getRegions(tableName).get(0);
+ HRegion hRegion2 = cluster.getRegions(tableName).get(1);
+ HStore hStore1 = hRegion1.getStore(cf);
+ HStore hStore2 = hRegion2.getStore(cf);
+
+ // For hStore1 && hStore2, set mock reference to one of the storeFiles
+ StoreFileInfo storeFileInfo1 = new ArrayList<>(hStore1.getStorefiles()).get(0).getFileInfo();
+ StoreFileInfo storeFileInfo2 = new ArrayList<>(hStore2.getStorefiles()).get(0).getFileInfo();
+ Field field = StoreFileInfo.class.getDeclaredField("reference");
+ field.setAccessible(true);
+ field.set(storeFileInfo1, Mockito.mock(Reference.class));
+ field.set(storeFileInfo2, Mockito.mock(Reference.class));
+ hStore1.triggerMajorCompaction();
+ hStore2.triggerMajorCompaction();
+
+ compactionContext = hStore1.requestCompaction();
+ assertTrue(compactionContext.isPresent());
+ // since we set mock reference to one of the storeFiles, we will get isAfterSplit=true &&
+ // highest priority for hStore1's compactionContext
+ assertTrue(compactionContext.get().getRequest().isAfterSplit());
+ assertEquals(compactionContext.get().getRequest().getPriority(), Integer.MIN_VALUE + 1000);
+
+ compactionContext =
+ hStore2.requestCompaction(Integer.MIN_VALUE + 10, CompactionLifeCycleTracker.DUMMY, null);
+ assertTrue(compactionContext.isPresent());
+ // compaction request contains higher priority than default priority of daughter region
+ // compaction (Integer.MIN_VALUE + 1000), hence we are expecting request priority to
+ // be accepted.
+ assertTrue(compactionContext.get().getRequest().isAfterSplit());
+ assertEquals(compactionContext.get().getRequest().getPriority(), Integer.MIN_VALUE + 10);
+ }
+
public static class FailingSplitMasterObserver implements MasterCoprocessor, MasterObserver {
volatile CountDownLatch latch;
@@ -637,18 +716,21 @@ public class TestSplitTransactionOnCluster {
}
}
- private void insertData(final TableName tableName, Admin admin, Table t) throws IOException,
- InterruptedException {
- Put p = new Put(Bytes.toBytes("row1"));
+ private void insertData(final TableName tableName, Admin admin, Table t) throws IOException {
+ insertData(tableName, admin, t, 1);
+ }
+
+ private void insertData(TableName tableName, Admin admin, Table t, int i) throws IOException {
+ Put p = new Put(Bytes.toBytes("row" + i));
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("1"));
t.put(p);
- p = new Put(Bytes.toBytes("row2"));
+ p = new Put(Bytes.toBytes("row" + (i + 1)));
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("2"));
t.put(p);
- p = new Put(Bytes.toBytes("row3"));
+ p = new Put(Bytes.toBytes("row" + (i + 2)));
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("3"));
t.put(p);
- p = new Put(Bytes.toBytes("row4"));
+ p = new Put(Bytes.toBytes("row" + (i + 3)));
p.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("q1"), Bytes.toBytes("4"));
t.put(p);
admin.flush(tableName);