You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/05/19 00:17:09 UTC
svn commit: r1340283 - in /hbase/branches/0.94/src:
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/
Author: stack
Date: Fri May 18 22:17:09 2012
New Revision: 1340283
URL: http://svn.apache.org/viewvc?rev=1340283&view=rev
Log:
HBASE-5920 New Compactions Logic can silently prevent user-initiated compactions from occurring
Modified:
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1340283&r1=1340282&r2=1340283&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Fri May 18 22:17:09 2012
@@ -49,12 +49,6 @@ public class CompactSplitThread implemen
private final ThreadPoolExecutor splits;
private final long throttleSize;
- /* The default priority for user-specified compaction requests.
- * The user gets top priority unless we have blocking compactions. (Pri <= 0)
- */
- public static final int PRIORITY_USER = 1;
- public static final int NO_PRIORITY = Integer.MIN_VALUE;
-
/**
* Splitting should not take place if the total number of regions exceed this.
* This is not a hard limit to the number of regions but it is a guideline to
@@ -145,7 +139,7 @@ public class CompactSplitThread implemen
public synchronized boolean requestSplit(final HRegion r) {
// don't split regions that are blocking
- if (shouldSplitRegion() && r.getCompactPriority() >= PRIORITY_USER) {
+ if (shouldSplitRegion() && r.getCompactPriority() >= Store.PRIORITY_USER) {
byte[] midKey = r.checkSplit();
if (midKey != null) {
requestSplit(r, midKey);
@@ -174,13 +168,13 @@ public class CompactSplitThread implemen
public synchronized void requestCompaction(final HRegion r,
final String why) {
for(Store s : r.getStores().values()) {
- requestCompaction(r, s, why, NO_PRIORITY);
+ requestCompaction(r, s, why, Store.NO_PRIORITY);
}
}
public synchronized void requestCompaction(final HRegion r, final Store s,
final String why) {
- requestCompaction(r, s, why, NO_PRIORITY);
+ requestCompaction(r, s, why, Store.NO_PRIORITY);
}
public synchronized void requestCompaction(final HRegion r, final String why,
@@ -201,10 +195,10 @@ public class CompactSplitThread implemen
if (this.server.isStopped()) {
return;
}
- CompactionRequest cr = s.requestCompaction();
+ CompactionRequest cr = s.requestCompaction(priority);
if (cr != null) {
cr.setServer(server);
- if (priority != NO_PRIORITY) {
+ if (priority != Store.NO_PRIORITY) {
cr.setPriority(priority);
}
ThreadPoolExecutor pool = largeCompactions;
@@ -222,6 +216,11 @@ public class CompactSplitThread implemen
+ (why != null && !why.isEmpty() ? "; Because: " + why : "")
+ "; " + this);
}
+ } else {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Not compacting " + r.getRegionNameAsString() +
+ " because compaction request was cancelled");
+ }
}
}
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1340283&r1=1340282&r2=1340283&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri May 18 22:17:09 2012
@@ -2915,9 +2915,11 @@ public class HRegionServer implements HR
if (major) {
region.triggerMajorCompaction();
}
+ LOG.trace("User-triggered compaction requested for region " +
+ region.getRegionNameAsString());
compactSplitThread.requestCompaction(region, "User-triggered "
+ (major ? "major " : "") + "compaction",
- CompactSplitThread.PRIORITY_USER);
+ Store.PRIORITY_USER);
}
/** @return the info server */
Modified: hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1340283&r1=1340282&r2=1340283&view=diff
==============================================================================
--- hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.94/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri May 18 22:17:09 2012
@@ -107,6 +107,7 @@ import com.google.common.collect.Lists;
*/
public class Store extends SchemaConfigured implements HeapSize {
static final Log LOG = LogFactory.getLog(Store.class);
+
protected final MemStore memstore;
// This stores directory in the filesystem.
private final Path homedir;
@@ -135,6 +136,12 @@ public class Store extends SchemaConfigu
private final int compactionKVMax;
private final boolean verifyBulkLoads;
+ /* The default priority for user-specified compaction requests.
+ * The user gets top priority unless we have blocking compactions. (Pri <= 0)
+ */
+ public static final int PRIORITY_USER = 1;
+ public static final int NO_PRIORITY = Integer.MIN_VALUE;
+
// not private for testing
/* package */ScanInfo scanInfo;
/*
@@ -170,7 +177,7 @@ public class Store extends SchemaConfigu
* @param region
* @param family HColumnDescriptor for this column
* @param fs file system object
- * @param conf configuration object
+ * @param confParam configuration object
* failed. Can be null.
* @throws IOException
*/
@@ -317,7 +324,7 @@ public class Store extends SchemaConfigu
public Path getHomedir() {
return homedir;
}
-
+
/**
* @return the data block encoder
*/
@@ -435,7 +442,7 @@ public class Store extends SchemaConfigu
/**
* Removes a kv from the memstore. The KeyValue is removed only
- * if its key & memstoreTS matches the key & memstoreTS value of the
+ * if its key & memstoreTS matches the key & memstoreTS value of the
* kv parameter.
*
* @param kv
@@ -521,8 +528,8 @@ public class Store extends SchemaConfigu
}
/**
- * This method should only be called from HRegion. It is assumed that the
- * ranges of values in the HFile fit within the stores assigned region.
+ * This method should only be called from HRegion. It is assumed that the
+ * ranges of values in the HFile fit within the stores assigned region.
* (assertBulkLoadHFileOk checks this)
*/
void bulkLoadHFile(String srcPathStr) throws IOException {
@@ -602,7 +609,7 @@ public class Store extends SchemaConfigu
ThreadPoolExecutor storeFileCloserThreadPool = this.region
.getStoreFileOpenAndCloseThreadPool("StoreFileCloserThread-"
+ this.family.getNameAsString());
-
+
// close each store file in parallel
CompletionService<Void> completionService =
new ExecutorCompletionService<Void>(storeFileCloserThreadPool);
@@ -614,7 +621,7 @@ public class Store extends SchemaConfigu
}
});
}
-
+
try {
for (int i = 0; i < result.size(); i++) {
Future<Void> future = completionService.take();
@@ -743,7 +750,7 @@ public class Store extends SchemaConfigu
scanner.close();
}
if (LOG.isInfoEnabled()) {
- LOG.info("Flushed " +
+ LOG.info("Flushed " +
", sequenceid=" + logCacheFlushId +
", memsize=" + StringUtils.humanReadableInt(flushed) +
", into tmp file " + pathName);
@@ -954,7 +961,7 @@ public class Store extends SchemaConfigu
* <p>We don't want to hold the structureLock for the whole time, as a compact()
* can be lengthy and we want to allow cache-flushes during this period.
*
- * @param CompactionRequest
+ * @param cr
* compaction details obtained from requestCompaction()
* @throws IOException
*/
@@ -1187,7 +1194,7 @@ public class Store extends SchemaConfigu
if (jitterPct > 0) {
long jitter = Math.round(ret * jitterPct);
// deterministic jitter avoids a major compaction storm on restart
- ImmutableList<StoreFile> snapshot = storefiles;
+ ImmutableList<StoreFile> snapshot = storefiles;
if (snapshot != null && !snapshot.isEmpty()) {
String seed = snapshot.get(0).getPath().getName();
double curRand = new Random(seed.hashCode()).nextDouble();
@@ -1201,6 +1208,10 @@ public class Store extends SchemaConfigu
}
public CompactionRequest requestCompaction() {
+ return requestCompaction(NO_PRIORITY);
+ }
+
+ public CompactionRequest requestCompaction(int priority) {
// don't even select for compaction if writes are disabled
if (!this.region.areWritesEnabled()) {
return null;
@@ -1231,7 +1242,7 @@ public class Store extends SchemaConfigu
// coprocessor is overriding normal file selection
filesToCompact = new CompactSelection(conf, candidates);
} else {
- filesToCompact = compactSelection(candidates);
+ filesToCompact = compactSelection(candidates, priority);
}
if (region.getCoprocessorHost() != null) {
@@ -1261,7 +1272,7 @@ public class Store extends SchemaConfigu
}
// everything went better than expected. create a compaction request
- int pri = getCompactPriority();
+ int pri = getCompactPriority(priority);
ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
}
} catch (IOException ex) {
@@ -1281,6 +1292,16 @@ public class Store extends SchemaConfigu
}
/**
+ * Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)}
+ * @param candidates
+ * @return
+ * @throws IOException
+ */
+ CompactSelection compactSelection(List<StoreFile> candidates) throws IOException {
+ return compactSelection(candidates,NO_PRIORITY);
+ }
+
+ /**
* Algorithm to choose which files to compact
*
* Configuration knobs:
@@ -1299,7 +1320,7 @@ public class Store extends SchemaConfigu
* @return subset copy of candidate list that meets compaction criteria
* @throws IOException
*/
- CompactSelection compactSelection(List<StoreFile> candidates)
+ CompactSelection compactSelection(List<StoreFile> candidates, int priority)
throws IOException {
// ASSUMPTION!!! filesCompacting is locked when calling this function
@@ -1347,10 +1368,16 @@ public class Store extends SchemaConfigu
return compactSelection;
}
- // major compact on user action or age (caveat: we have too many files)
- boolean majorcompaction =
- (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact()))
- && compactSelection.getFilesToCompact().size() < this.maxFilesToCompact;
+ // Force a major compaction if this is a user-requested major compaction,
+ // or if we do not have too many files to compact and this was requested
+ // as a major compaction
+ boolean majorcompaction = (forcemajor && priority == PRIORITY_USER) ||
+ (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
+ (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
+ );
+ LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
+ this.getColumnFamilyName() + ": Initiating " +
+ (majorcompaction ? "major" : "minor") + "compaction");
if (!majorcompaction &&
!hasReferences(compactSelection.getFilesToCompact())) {
@@ -1360,6 +1387,11 @@ public class Store extends SchemaConfigu
// skip selection algorithm if we don't have enough files
if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Not compacting files because we only have " +
+ compactSelection.getFilesToCompact().size() +
+ " files ready for compaction. Need " + this.minFilesToCompact + " to initiate.");
+ }
compactSelection.emptyFileList();
return compactSelection;
}
@@ -1427,11 +1459,18 @@ public class Store extends SchemaConfigu
return compactSelection;
}
} else {
- // all files included in this compaction, up to max
- if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
- int pastMax =
- compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
- compactSelection.clearSubList(0, pastMax);
+ if(majorcompaction) {
+ if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
+ LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
+ " files, probably because of a user-requested major compaction");
+ if(priority != PRIORITY_USER) {
+ LOG.error("Compacting more than max files on a non user-requested compaction");
+ }
+ }
+ } else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
+ // all files included in this compaction, up to max
+ int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
+ compactSelection.getFilesToCompact().subList(0, pastMax).clear();
}
}
return compactSelection;
@@ -2093,11 +2132,21 @@ public class Store extends SchemaConfigu
return this.memstore.heapSize();
}
+ public int getCompactPriority() {
+ return getCompactPriority(NO_PRIORITY);
+ }
+
/**
* @return The priority that this store should have in the compaction queue
+ * @param priority
*/
- public int getCompactPriority() {
- return this.blockingStoreFileCount - this.storefiles.size();
+ public int getCompactPriority(int priority) {
+ // If this is a user-requested compaction, leave this at the highest priority
+ if(priority == PRIORITY_USER) {
+ return PRIORITY_USER;
+ } else {
+ return this.blockingStoreFileCount - this.storefiles.size();
+ }
}
HRegion getHRegion() {
@@ -2225,7 +2274,7 @@ public class Store extends SchemaConfigu
return this.cacheConf;
}
- public static final long FIXED_OVERHEAD =
+ public static final long FIXED_OVERHEAD =
ClassSize.align(SchemaConfigured.SCHEMA_CONFIGURED_UNALIGNED_HEAP_SIZE +
+ (20 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+ (6 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
@@ -2303,7 +2352,7 @@ public class Store extends SchemaConfigu
public boolean getKeepDeletedCells() {
return keepDeletedCells;
}
-
+
public long getTimeToPurgeDeletes() {
return timeToPurgeDeletes;
}
Modified: hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1340283&r1=1340282&r2=1340283&view=diff
==============================================================================
--- hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/branches/0.94/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Fri May 18 22:17:09 2012
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.experimental.categories.Category;
@@ -76,6 +77,7 @@ public class TestCompaction extends HBas
private int compactionThreshold;
private byte[] firstRowBytes, secondRowBytes, thirdRowBytes;
final private byte[] col1, col2;
+ private static final long MAX_FILES_TO_COMPACT = 10;
/** constructor */
public TestCompaction() throws Exception {
@@ -612,6 +614,43 @@ public class TestCompaction extends HBas
fail("testCompactionWithCorruptResult failed since no exception was" +
"thrown while completing a corrupt file");
}
+
+ /**
+ * Test for HBASE-5920 - Test user requested major compactions always occurring
+ */
+ public void testNonUserMajorCompactionRequest() throws Exception {
+ Store store = r.getStore(COLUMN_FAMILY);
+ createStoreFile(r);
+ for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
+ createStoreFile(r);
+ }
+ store.triggerMajorCompaction();
+
+ CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY);
+ assertNotNull("Expected to receive a compaction request", request);
+ assertEquals(
+ "System-requested major compaction should not occur if there are too many store files",
+ false,
+ request.isMajor());
+ }
+
+ /**
+ * Test for HBASE-5920
+ */
+ public void testUserMajorCompactionRequest() throws IOException{
+ Store store = r.getStore(COLUMN_FAMILY);
+ createStoreFile(r);
+ for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
+ createStoreFile(r);
+ }
+ store.triggerMajorCompaction();
+ CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER);
+ assertNotNull("Expected to receive a compaction request", request);
+ assertEquals(
+ "User-requested major compaction should always occur, even if there are too many store files",
+ true,
+ request.isMajor());
+ }
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =