You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2012/11/27 06:23:15 UTC
svn commit: r1414000 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/compactions/
test/java/org/apache/hadoop/hbase/regionserver/
Author: stack
Date: Tue Nov 27 05:23:14 2012
New Revision: 1414000
URL: http://svn.apache.org/viewvc?rev=1414000&view=rev
Log:
HBASE-7110 refactor the compaction selection and config code similarly to 0.89-fb changes; REVERT of original patch and ADDENDUM because applied old patch originally, v8
Removed:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfiguration.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreUtils.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1414000&r1=1413999&r2=1414000&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Nov 27 05:23:14 2012
@@ -4172,7 +4172,7 @@ public class HRegion implements HeapSize
}
/**
- * @return True if needs a major compaction.
+ * @return True if needs a mojor compaction.
* @throws IOException
*/
boolean isMajorCompaction() throws IOException {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1414000&r1=1413999&r2=1414000&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Tue Nov 27 05:23:14 2012
@@ -19,7 +19,6 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
-import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -64,7 +63,9 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.InvalidHFileException;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.regionserver.compactions.*;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ChecksumType;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -74,6 +75,8 @@ import org.apache.hadoop.hbase.util.FSUt
import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
+import com.google.common.base.Predicate;
+import com.google.common.collect.Collections2;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -100,9 +103,8 @@ import com.google.common.collect.Lists;
* <p>Locking and transactions are handled at a higher level. This API should
* not be called directly but by an HRegion manager.
*/
-//TODO: move StoreConfiguration implementation into a separate class.
@InterfaceAudience.Private
-public class HStore implements Store, StoreConfiguration {
+public class HStore implements Store {
static final Log LOG = LogFactory.getLog(HStore.class);
protected final MemStore memstore;
@@ -110,12 +112,15 @@ public class HStore implements Store, St
private final Path homedir;
private final HRegion region;
private final HColumnDescriptor family;
- CompactionPolicy compactionPolicy;
final FileSystem fs;
final Configuration conf;
final CacheConfig cacheConf;
- // ttl in milliseconds. TODO: can this be removed? Already stored in scanInfo.
+ // ttl in milliseconds.
private long ttl;
+ private final int minFilesToCompact;
+ private final int maxFilesToCompact;
+ private final long minCompactSize;
+ private final long maxCompactSize;
private long lastCompactSize = 0;
volatile boolean forceMajor = false;
/* how many bytes to write between status checks */
@@ -188,7 +193,7 @@ public class HStore implements Store, St
this.comparator = info.getComparator();
// Get TTL
- this.ttl = determineTTLFromFamily(family);
+ this.ttl = getTTL(family);
// used by ScanQueryMatcher
long timeToPurgeDeletes =
Math.max(conf.getLong("hbase.hstore.time.to.purge.deletes", 0), 0);
@@ -199,11 +204,23 @@ public class HStore implements Store, St
scanInfo = new ScanInfo(family, ttl, timeToPurgeDeletes, this.comparator);
this.memstore = new MemStore(conf, this.comparator);
+ // By default, compact if storefile.count >= minFilesToCompact
+ this.minFilesToCompact = Math.max(2,
+ conf.getInt("hbase.hstore.compaction.min",
+ /*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
+ LOG.info("hbase.hstore.compaction.min = " + this.minFilesToCompact);
+
// Setting up cache configuration for this family
this.cacheConf = new CacheConfig(conf, family);
this.blockingStoreFileCount =
conf.getInt("hbase.hstore.blockingStoreFiles", 7);
+ this.maxFilesToCompact = conf.getInt("hbase.hstore.compaction.max", 10);
+ this.minCompactSize = conf.getLong("hbase.hstore.compaction.min.size",
+ this.region.memstoreFlushSize);
+ this.maxCompactSize
+ = conf.getLong("hbase.hstore.compaction.max.size", Long.MAX_VALUE);
+
this.verifyBulkLoads = conf.getBoolean("hbase.hstore.bulkload.verify", false);
if (HStore.closeCheckInterval == 0) {
@@ -217,16 +234,14 @@ public class HStore implements Store, St
// initilize bytes per checksum
this.bytesPerChecksum = getBytesPerChecksum(conf);
// Create a compaction tool instance
- this.compactor = new Compactor(conf);
- // Create a compaction manager.
- this.compactionPolicy = new CompactionPolicy(conf, this);
+ this.compactor = new Compactor(this.conf);
}
/**
* @param family
* @return
*/
- private static long determineTTLFromFamily(final HColumnDescriptor family) {
+ long getTTL(final HColumnDescriptor family) {
// HCD.getTimeToLive returns ttl in seconds. Convert to milliseconds.
long ttl = family.getTimeToLive();
if (ttl == HConstants.FOREVER) {
@@ -270,22 +285,6 @@ public class HStore implements Store, St
return this.fs;
}
- /* Implementation of StoreConfiguration */
- public long getStoreFileTtl() {
- // TTL only applies if there's no MIN_VERSIONs setting on the column.
- return (this.scanInfo.getMinVersions() == 0) ? this.ttl : Long.MAX_VALUE;
- }
-
- public Long getMajorCompactionPeriod() {
- String strCompactionTime = this.family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
- return (strCompactionTime != null) ? new Long(strCompactionTime) : null;
- }
-
- public long getMemstoreFlushSize() {
- return this.region.memstoreFlushSize;
- }
- /* End implementation of StoreConfiguration */
-
/**
* Returns the configured bytesPerChecksum value.
* @param conf The configuration
@@ -353,8 +352,7 @@ public class HStore implements Store, St
* @param family family name of this store
* @return Path to the family/Store home directory
*/
- public static Path getStoreHomedir(final Path parentRegionDirectory,
- final byte[] family) {
+ public static Path getStoreHomedir(final Path parentRegionDirectory, final byte[] family) {
return new Path(parentRegionDirectory, new Path(Bytes.toString(family)));
}
/**
@@ -568,8 +566,7 @@ public class HStore implements Store, St
"the destination store. Copying file over to destination filesystem.");
Path tmpPath = getTmpPath();
FileUtil.copy(srcFs, srcPath, fs, tmpPath, false, conf);
- LOG.info("Copied " + srcPath
- + " to temporary path on destination filesystem: " + tmpPath);
+ LOG.info("Copied " + srcPath + " to temporary path on destination filesystem: " + tmpPath);
srcPath = tmpPath;
}
@@ -666,8 +663,8 @@ public class HStore implements Store, St
/**
* Snapshot this stores memstore. Call before running
- * {@link #flushCache(long, SortedSet, TimeRangeTracker, AtomicLong, MonitoredTask)}
- * so it has some work to do.
+ * {@link #flushCache(long, SortedSet, TimeRangeTracker, AtomicLong, MonitoredTask)} so it has
+ * some work to do.
*/
void snapshot() {
this.memstore.snapshot();
@@ -725,8 +722,7 @@ public class HStore implements Store, St
InternalScanner scanner = null;
KeyValueScanner memstoreScanner = new CollectionBackedScanner(set, this.comparator);
if (getHRegion().getCoprocessorHost() != null) {
- scanner = getHRegion().getCoprocessorHost()
- .preFlushScannerOpen(this, memstoreScanner);
+ scanner = getHRegion().getCoprocessorHost().preFlushScannerOpen(this, memstoreScanner);
}
if (scanner == null) {
Scan scan = new Scan();
@@ -763,8 +759,7 @@ public class HStore implements Store, St
if (!kvs.isEmpty()) {
for (KeyValue kv : kvs) {
// If we know that this KV is going to be included always, then let us
- // set its memstoreTS to 0. This will help us save space when writing to
- // disk.
+ // set its memstoreTS to 0. This will help us save space when writing to disk.
if (kv.getMemstoreTS() <= smallestReadPoint) {
// let us not change the original KV. It could be in the memstore
// changing its memstoreTS could affect other threads/scanners.
@@ -779,8 +774,7 @@ public class HStore implements Store, St
} while (hasMore);
} finally {
// Write out the log sequence number that corresponds to this output
- // hfile. Also write current time in metadata as minFlushTime.
- // The hfile is current up to and including logCacheFlushId.
+ // hfile. The hfile is current up to and including logCacheFlushId.
status.setStatus("Flushing " + this + ": appending metadata");
writer.appendMetadata(logCacheFlushId, false);
status.setStatus("Flushing " + this + ": closing flushed file");
@@ -1010,12 +1004,12 @@ public class HStore implements Store, St
// Ready to go. Have list of files to compact.
LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
- + this + " of " + this.region.getRegionInfo().getRegionNameAsString()
+ + this + " of "
+ + this.region.getRegionInfo().getRegionNameAsString()
+ " into tmpdir=" + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+ StringUtils.humanReadableInt(cr.getSize()));
StoreFile sf = null;
- long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
try {
StoreFile.Writer writer =
this.compactor.compact(this, filesToCompact, cr.isMajor(), maxId);
@@ -1037,7 +1031,6 @@ public class HStore implements Store, St
}
}
- long now = EnvironmentEdgeManager.currentTimeMillis();
LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
+ filesToCompact.size() + " file(s) in " + this + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
@@ -1045,11 +1038,8 @@ public class HStore implements Store, St
(sf == null ? "none" : sf.getPath().getName()) +
", size=" + (sf == null ? "none" :
StringUtils.humanReadableInt(sf.getReader().length()))
- + "; total size for store is " + StringUtils.humanReadableInt(storeSize)
- + ". This selection was in queue for "
- + StringUtils.formatTimeDiff(compactionStartTime, cr.getSelectionTime())
- + ", and took " + StringUtils.formatTimeDiff(now, compactionStartTime)
- + " to execute.");
+ + "; total size for store is "
+ + StringUtils.humanReadableInt(storeSize));
return sf;
}
@@ -1104,7 +1094,38 @@ public class HStore implements Store, St
@Override
public boolean hasReferences() {
- return StoreUtils.hasReferences(this.storefiles);
+ return hasReferences(this.storefiles);
+ }
+
+ /*
+ * @param files
+ * @return True if any of the files in <code>files</code> are References.
+ */
+ private boolean hasReferences(Collection<StoreFile> files) {
+ if (files != null && files.size() > 0) {
+ for (StoreFile hsf: files) {
+ if (hsf.isReference()) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /*
+ * Gets lowest timestamp from candidate StoreFiles
+ *
+ * @param fs
+ * @param dir
+ * @throws IOException
+ */
+ public static long getLowestTimestamp(final List<StoreFile> candidates)
+ throws IOException {
+ long minTs = Long.MAX_VALUE;
+ for (StoreFile storeFile : candidates) {
+ minTs = Math.min(minTs, storeFile.getModificationTimeStamp());
+ }
+ return minTs;
}
@Override
@@ -1122,7 +1143,91 @@ public class HStore implements Store, St
}
List<StoreFile> candidates = new ArrayList<StoreFile>(this.storefiles);
- return compactionPolicy.isMajorCompaction(candidates);
+
+ // exclude files above the max compaction threshold
+ // except: save all references. we MUST compact them
+ int pos = 0;
+ while (pos < candidates.size() &&
+ candidates.get(pos).getReader().length() > this.maxCompactSize &&
+ !candidates.get(pos).isReference()) ++pos;
+ candidates.subList(0, pos).clear();
+
+ return isMajorCompaction(candidates);
+ }
+
+ /*
+ * @param filesToCompact Files to compact. Can be null.
+ * @return True if we should run a major compaction.
+ */
+ private boolean isMajorCompaction(final List<StoreFile> filesToCompact) throws IOException {
+ boolean result = false;
+ long mcTime = getNextMajorCompactTime();
+ if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
+ return result;
+ }
+ // TODO: Use better method for determining stamp of last major (HBASE-2990)
+ long lowTimestamp = getLowestTimestamp(filesToCompact);
+ long now = System.currentTimeMillis();
+ if (lowTimestamp > 0l && lowTimestamp < (now - mcTime)) {
+ // Major compaction time has elapsed.
+ if (filesToCompact.size() == 1) {
+ // Single file
+ StoreFile sf = filesToCompact.get(0);
+ long oldest =
+ (sf.getReader().timeRangeTracker == null) ?
+ Long.MIN_VALUE :
+ now - sf.getReader().timeRangeTracker.minimumTimestamp;
+ if (sf.isMajorCompaction() &&
+ (this.ttl == HConstants.FOREVER || oldest < this.ttl)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping major compaction of " + this +
+ " because one (major) compacted file only and oldestTime " +
+ oldest + "ms is < ttl=" + this.ttl);
+ }
+ } else if (this.ttl != HConstants.FOREVER && oldest > this.ttl) {
+ LOG.debug("Major compaction triggered on store " + this +
+ ", because keyvalues outdated; time since last major compaction " +
+ (now - lowTimestamp) + "ms");
+ result = true;
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Major compaction triggered on store " + this +
+ "; time since last major compaction " + (now - lowTimestamp) + "ms");
+ }
+ result = true;
+ }
+ }
+ return result;
+ }
+
+ long getNextMajorCompactTime() {
+ // default = 24hrs
+ long ret = conf.getLong(HConstants.MAJOR_COMPACTION_PERIOD, 1000*60*60*24);
+ if (family.getValue(HConstants.MAJOR_COMPACTION_PERIOD) != null) {
+ String strCompactionTime =
+ family.getValue(HConstants.MAJOR_COMPACTION_PERIOD);
+ ret = (new Long(strCompactionTime)).longValue();
+ }
+
+ if (ret > 0) {
+ // default = 20% = +/- 4.8 hrs
+ double jitterPct = conf.getFloat("hbase.hregion.majorcompaction.jitter",
+ 0.20F);
+ if (jitterPct > 0) {
+ long jitter = Math.round(ret * jitterPct);
+ // deterministic jitter avoids a major compaction storm on restart
+ ImmutableList<StoreFile> snapshot = storefiles;
+ if (snapshot != null && !snapshot.isEmpty()) {
+ String seed = snapshot.get(0).getPath().getName();
+ double curRand = new Random(seed.hashCode()).nextDouble();
+ ret += jitter - Math.round(2L * jitter * curRand);
+ } else {
+ ret = 0; // no storefiles == no major compaction
+ }
+ }
+ }
+ return ret;
}
public CompactionRequest requestCompaction() throws IOException {
@@ -1158,11 +1263,9 @@ public class HStore implements Store, St
CompactSelection filesToCompact;
if (override) {
// coprocessor is overriding normal file selection
- filesToCompact = new CompactSelection(candidates);
+ filesToCompact = new CompactSelection(conf, candidates);
} else {
- boolean isUserCompaction = priority == Store.PRIORITY_USER;
- filesToCompact = compactionPolicy.selectCompaction(candidates, isUserCompaction,
- forceMajor && filesCompacting.isEmpty());
+ filesToCompact = compactSelection(candidates, priority);
}
if (region.getCoprocessorHost() != null) {
@@ -1185,17 +1288,12 @@ public class HStore implements Store, St
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
// major compaction iff all StoreFiles are included
- boolean isMajor =
- (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
+ boolean isMajor = (filesToCompact.getFilesToCompact().size() == this.storefiles.size());
if (isMajor) {
// since we're enqueuing a major, update the compaction wait interval
this.forceMajor = false;
}
- LOG.debug(getHRegion().regionInfo.getEncodedName() + " - " +
- getColumnFamilyName() + ": Initiating " +
- (isMajor ? "major" : "minor") + " compaction");
-
// everything went better than expected. create a compaction request
int pri = getCompactPriority(priority);
ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
@@ -1218,6 +1316,191 @@ public class HStore implements Store, St
}
/**
+ * Algorithm to choose which files to compact, see {@link #compactSelection(java.util.List, int)}
+ * @param candidates
+ * @return
+ * @throws IOException
+ */
+ CompactSelection compactSelection(List<StoreFile> candidates) throws IOException {
+ return compactSelection(candidates,Store.NO_PRIORITY);
+ }
+
+ /**
+ * Algorithm to choose which files to compact
+ *
+ * Configuration knobs:
+ * "hbase.hstore.compaction.ratio"
+ * normal case: minor compact when file <= sum(smaller_files) * ratio
+ * "hbase.hstore.compaction.min.size"
+ * unconditionally compact individual files below this size
+ * "hbase.hstore.compaction.max.size"
+ * never compact individual files above this size (unless splitting)
+ * "hbase.hstore.compaction.min"
+ * min files needed to minor compact
+ * "hbase.hstore.compaction.max"
+ * max files to compact at once (avoids OOM)
+ *
+ * @param candidates candidate files, ordered from oldest to newest
+ * @return subset copy of candidate list that meets compaction criteria
+ * @throws IOException
+ */
+ CompactSelection compactSelection(List<StoreFile> candidates, int priority)
+ throws IOException {
+ // ASSUMPTION!!! filesCompacting is locked when calling this function
+
+ /* normal skew:
+ *
+ * older ----> newer
+ * _
+ * | | _
+ * | | | | _
+ * --|-|- |-|- |-|---_-------_------- minCompactSize
+ * | | | | | | | | _ | |
+ * | | | | | | | | | | | |
+ * | | | | | | | | | | | |
+ */
+ CompactSelection compactSelection = new CompactSelection(conf, candidates);
+
+ boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
+ if (!forcemajor) {
+ // Delete the expired store files before the compaction selection.
+ if (conf.getBoolean("hbase.store.delete.expired.storefile", true)
+ && (ttl != Long.MAX_VALUE) && (this.scanInfo.minVersions == 0)) {
+ CompactSelection expiredSelection = compactSelection
+ .selectExpiredStoreFilesToCompact(
+ EnvironmentEdgeManager.currentTimeMillis() - this.ttl);
+
+ // If there is any expired store files, delete them by compaction.
+ if (expiredSelection != null) {
+ return expiredSelection;
+ }
+ }
+ // do not compact old files above a configurable threshold
+ // save all references. we MUST compact them
+ int pos = 0;
+ while (pos < compactSelection.getFilesToCompact().size() &&
+ compactSelection.getFilesToCompact().get(pos).getReader().length()
+ > maxCompactSize &&
+ !compactSelection.getFilesToCompact().get(pos).isReference()) ++pos;
+ if (pos != 0) compactSelection.clearSubList(0, pos);
+ }
+
+ if (compactSelection.getFilesToCompact().isEmpty()) {
+ LOG.debug(this.getHRegionInfo().getEncodedName() + " - " +
+ this + ": no store files to compact");
+ compactSelection.emptyFileList();
+ return compactSelection;
+ }
+
+ // Force a major compaction if this is a user-requested major compaction,
+ // or if we do not have too many files to compact and this was requested
+ // as a major compaction
+ boolean majorcompaction = (forcemajor && priority == Store.PRIORITY_USER) ||
+ (forcemajor || isMajorCompaction(compactSelection.getFilesToCompact())) &&
+ (compactSelection.getFilesToCompact().size() < this.maxFilesToCompact
+ );
+ LOG.debug(this.getHRegionInfo().getEncodedName() + " - "
+ + this.getColumnFamilyName() + ": Initiating " +
+ (majorcompaction ? "major" : "minor") + "compaction");
+
+ if (!majorcompaction &&
+ !hasReferences(compactSelection.getFilesToCompact())) {
+ // we're doing a minor compaction, let's see what files are applicable
+ int start = 0;
+ double r = compactSelection.getCompactSelectionRatio();
+
+ // remove bulk import files that request to be excluded from minors
+ compactSelection.getFilesToCompact().removeAll(Collections2.filter(
+ compactSelection.getFilesToCompact(),
+ new Predicate<StoreFile>() {
+ public boolean apply(StoreFile input) {
+ return input.excludeFromMinorCompaction();
+ }
+ }));
+
+ // skip selection algorithm if we don't have enough files
+ if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Not compacting files because we only have " +
+ compactSelection.getFilesToCompact().size() +
+ " files ready for compaction. Need " + this.minFilesToCompact + " to initiate.");
+ }
+ compactSelection.emptyFileList();
+ return compactSelection;
+ }
+
+ /* TODO: add sorting + unit test back in when HBASE-2856 is fixed
+ // Sort files by size to correct when normal skew is altered by bulk load.
+ Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
+ */
+
+ // get store file sizes for incremental compacting selection.
+ int countOfFiles = compactSelection.getFilesToCompact().size();
+ long [] fileSizes = new long[countOfFiles];
+ long [] sumSize = new long[countOfFiles];
+ for (int i = countOfFiles-1; i >= 0; --i) {
+ StoreFile file = compactSelection.getFilesToCompact().get(i);
+ fileSizes[i] = file.getReader().length();
+ // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
+ int tooFar = i + this.maxFilesToCompact - 1;
+ sumSize[i] = fileSizes[i]
+ + ((i+1 < countOfFiles) ? sumSize[i+1] : 0)
+ - ((tooFar < countOfFiles) ? fileSizes[tooFar] : 0);
+ }
+
+ /* Start at the oldest file and stop when you find the first file that
+ * meets compaction criteria:
+ * (1) a recently-flushed, small file (i.e. <= minCompactSize)
+ * OR
+ * (2) within the compactRatio of sum(newer_files)
+ * Given normal skew, any newer files will also meet this criteria
+ *
+ * Additional Note:
+ * If fileSizes.size() >> maxFilesToCompact, we will recurse on
+ * compact(). Consider the oldest files first to avoid a
+ * situation where we always compact [end-threshold,end). Then, the
+ * last file becomes an aggregate of the previous compactions.
+ */
+ while(countOfFiles - start >= this.minFilesToCompact &&
+ fileSizes[start] >
+ Math.max(minCompactSize, (long)(sumSize[start+1] * r))) {
+ ++start;
+ }
+ int end = Math.min(countOfFiles, start + this.maxFilesToCompact);
+ long totalSize = fileSizes[start]
+ + ((start+1 < countOfFiles) ? sumSize[start+1] : 0);
+ compactSelection = compactSelection.getSubList(start, end);
+
+ // if we don't have enough files to compact, just wait
+ if (compactSelection.getFilesToCompact().size() < this.minFilesToCompact) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipped compaction of " + this
+ + ". Only " + (end - start) + " file(s) of size "
+ + StringUtils.humanReadableInt(totalSize)
+ + " have met compaction criteria.");
+ }
+ compactSelection.emptyFileList();
+ return compactSelection;
+ }
+ } else {
+ if(majorcompaction) {
+ if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
+ LOG.debug("Warning, compacting more than " + this.maxFilesToCompact +
+ " files, probably because of a user-requested major compaction");
+ if(priority != Store.PRIORITY_USER) {
+ LOG.error("Compacting more than max files on a non user-requested compaction");
+ }
+ }
+ } else if (compactSelection.getFilesToCompact().size() > this.maxFilesToCompact) {
+ // all files included in this compaction, up to max
+ int pastMax = compactSelection.getFilesToCompact().size() - this.maxFilesToCompact;
+ compactSelection.getFilesToCompact().subList(0, pastMax).clear();
+ }
+ }
+ return compactSelection;
+ }
+
+ /**
* Validates a store file by opening and closing it. In HFileV2 this should
* not be an expensive operation.
*
@@ -1314,8 +1597,8 @@ public class HStore implements Store, St
// let the archive util decide if we should archive or delete the files
LOG.debug("Removing store files after compaction...");
- HFileArchiver.archiveStoreFiles(this.fs, this.region, this.conf,
- this.family.getName(), compactedFiles);
+ HFileArchiver.archiveStoreFiles(this.fs, this.region, this.conf, this.family.getName(),
+ compactedFiles);
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
@@ -1722,7 +2005,11 @@ public class HStore implements Store, St
@Override
public boolean throttleCompaction(long compactionSize) {
- return compactionPolicy.throttleCompaction(compactionSize);
+ // see HBASE-5867 for discussion on the default
+ long throttlePoint = conf.getLong(
+ "hbase.regionserver.thread.compaction.throttle",
+ 2 * this.minFilesToCompact * this.region.memstoreFlushSize);
+ return compactionSize > throttlePoint;
}
@Override
@@ -1828,7 +2115,7 @@ public class HStore implements Store, St
@Override
public boolean needsCompaction() {
- return compactionPolicy.needsCompaction(storefiles.size() - filesCompacting.size());
+ return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
}
@Override
@@ -1837,8 +2124,8 @@ public class HStore implements Store, St
}
public static final long FIXED_OVERHEAD =
- ClassSize.align((20 * ClassSize.REFERENCE) + (4 * Bytes.SIZEOF_LONG)
- + (3 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
+ ClassSize.align((19 * ClassSize.REFERENCE) + (6 * Bytes.SIZEOF_LONG)
+ + (5 * Bytes.SIZEOF_INT) + Bytes.SIZEOF_BOOLEAN);
public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1414000&r1=1413999&r2=1414000&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Nov 27 05:23:14 2012
@@ -65,7 +65,6 @@ import org.apache.hadoop.hbase.util.Bloo
import org.apache.hadoop.hbase.util.BloomFilterFactory;
import org.apache.hadoop.hbase.util.BloomFilterWriter;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.RawComparator;
@@ -296,7 +295,7 @@ public class StoreFile {
* @return True if this is a StoreFile Reference; call after {@link #open()}
* else may get wrong answer.
*/
- public boolean isReference() {
+ boolean isReference() {
return this.reference != null;
}
@@ -358,7 +357,7 @@ public class StoreFile {
/**
* @return True if this file was made by a major compaction.
*/
- public boolean isMajorCompaction() {
+ boolean isMajorCompaction() {
if (this.majorCompaction == null) {
throw new NullPointerException("This has not been set yet");
}
@@ -368,7 +367,7 @@ public class StoreFile {
/**
* @return True if this file should not be part of a minor compaction.
*/
- public boolean excludeFromMinorCompaction() {
+ boolean excludeFromMinorCompaction() {
return this.excludeFromMinorCompaction;
}
@@ -580,6 +579,7 @@ public class StoreFile {
}
}
}
+
this.reader.setSequenceID(this.sequenceid);
b = metadataMap.get(HFileWriterV2.MAX_MEMSTORE_TS_KEY);
@@ -945,11 +945,6 @@ public class StoreFile {
return r.write(fs, p);
}
- public Long getMinimumTimestamp() {
- return (getReader().timeRangeTracker == null) ?
- null :
- getReader().timeRangeTracker.minimumTimestamp;
- }
/**
* A StoreFile writer. Use this to read/write HBase Store Files. It is package
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java?rev=1414000&r1=1413999&r2=1414000&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java Tue Nov 27 05:23:14 2012
@@ -19,13 +19,15 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@InterfaceAudience.Private
public class CompactSelection {
@@ -46,15 +48,37 @@ public class CompactSelection {
*/
private final static Object compactionCountLock = new Object();
+ // HBase conf object
+ Configuration conf;
// was this compaction promoted to an off-peak
boolean isOffPeakCompaction = false;
- // CompactSelection object creation time.
- private final long selectionTime;
+ // compactRatio: double on purpose! Float.MAX < Long.MAX < Double.MAX
+ // With float, java will downcast your long to float for comparisons (bad)
+ private double compactRatio;
+ // compaction ratio off-peak
+ private double compactRatioOffPeak;
+ // offpeak start time
+ private int offPeakStartHour = -1;
+ // off peak end time
+ private int offPeakEndHour = -1;
- public CompactSelection(List<StoreFile> filesToCompact) {
- this.selectionTime = EnvironmentEdgeManager.currentTimeMillis();
+ public CompactSelection(Configuration conf, List<StoreFile> filesToCompact) {
this.filesToCompact = filesToCompact;
- this.isOffPeakCompaction = false;
+ this.conf = conf;
+ this.compactRatio = conf.getFloat("hbase.hstore.compaction.ratio", 1.2F);
+ this.compactRatioOffPeak = conf.getFloat("hbase.hstore.compaction.ratio.offpeak", 5.0F);
+
+ // Peak time is from [offPeakStartHour, offPeakEndHour). Valid numbers are [0, 23]
+ this.offPeakStartHour = conf.getInt("hbase.offpeak.start.hour", -1);
+ this.offPeakEndHour = conf.getInt("hbase.offpeak.end.hour", -1);
+ if (!isValidHour(this.offPeakStartHour) || !isValidHour(this.offPeakEndHour)) {
+ if (!(this.offPeakStartHour == -1 && this.offPeakEndHour == -1)) {
+ LOG.warn("Invalid start/end hour for peak hour : start = " +
+ this.offPeakStartHour + " end = " + this.offPeakEndHour +
+ ". Valid numbers are [0-23]");
+ }
+ this.offPeakStartHour = this.offPeakEndHour = -1;
+ }
}
/**
@@ -89,25 +113,49 @@ public class CompactSelection {
}
if (hasExpiredStoreFiles) {
- expiredSFSelection = new CompactSelection(expiredStoreFiles);
+ expiredSFSelection = new CompactSelection(conf, expiredStoreFiles);
}
return expiredSFSelection;
}
/**
+ * If the current hour falls in the off peak times and there are no
+ * outstanding off peak compactions, the current compaction is
+ * promoted to an off peak compaction. Currently only one off peak
+ * compaction is present in the compaction queue.
+ *
+ * @param currentHour
+ * @return
+ */
+ public double getCompactSelectionRatio() {
+ double r = this.compactRatio;
+ synchronized(compactionCountLock) {
+ if (isOffPeakHour() && numOutstandingOffPeakCompactions == 0) {
+ r = this.compactRatioOffPeak;
+ numOutstandingOffPeakCompactions++;
+ isOffPeakCompaction = true;
+ }
+ }
+ if(isOffPeakCompaction) {
+ LOG.info("Running an off-peak compaction, selection ratio = " +
+ compactRatioOffPeak + ", numOutstandingOffPeakCompactions is now " +
+ numOutstandingOffPeakCompactions);
+ }
+ return r;
+ }
+
+ /**
* The current compaction finished, so reset the off peak compactions count
* if this was an off peak compaction.
*/
public void finishRequest() {
if (isOffPeakCompaction) {
- long newValueToLog = -1;
synchronized(compactionCountLock) {
- assert !isOffPeakCompaction : "Double-counting off-peak count for compaction";
- newValueToLog = --numOutstandingOffPeakCompactions;
+ numOutstandingOffPeakCompactions--;
isOffPeakCompaction = false;
}
LOG.info("Compaction done, numOutstandingOffPeakCompactions is now " +
- newValueToLog);
+ numOutstandingOffPeakCompactions);
}
}
@@ -122,14 +170,13 @@ public class CompactSelection {
public void emptyFileList() {
filesToCompact.clear();
if (isOffPeakCompaction) {
- long newValueToLog = -1;
synchronized(compactionCountLock) {
// reset the off peak count
- newValueToLog = --numOutstandingOffPeakCompactions;
+ numOutstandingOffPeakCompactions--;
isOffPeakCompaction = false;
}
LOG.info("Nothing to compact, numOutstandingOffPeakCompactions is now " +
- newValueToLog);
+ numOutstandingOffPeakCompactions);
}
}
@@ -137,30 +184,16 @@ public class CompactSelection {
return this.isOffPeakCompaction;
}
- public static long getNumOutStandingOffPeakCompactions() {
- synchronized(compactionCountLock) {
- return numOutstandingOffPeakCompactions;
+ private boolean isOffPeakHour() {
+ int currentHour = (new GregorianCalendar()).get(Calendar.HOUR_OF_DAY);
+ // If offpeak time checking is disabled just return false.
+ if (this.offPeakStartHour == this.offPeakEndHour) {
+ return false;
}
- }
-
- /**
- * Tries making the compaction off-peak.
- * Only checks internal compaction constraints, not timing.
- * @return Eventual value of isOffPeakCompaction.
- */
- public boolean trySetOffpeak() {
- assert !isOffPeakCompaction : "Double-setting off-peak for compaction " + this;
- synchronized(compactionCountLock) {
- if (numOutstandingOffPeakCompactions == 0) {
- numOutstandingOffPeakCompactions++;
- isOffPeakCompaction = true;
- }
+ if (this.offPeakStartHour < this.offPeakEndHour) {
+ return (currentHour >= this.offPeakStartHour && currentHour < this.offPeakEndHour);
}
- return isOffPeakCompaction;
- }
-
- public long getSelectionTime() {
- return selectionTime;
+ return (currentHour >= this.offPeakStartHour || currentHour < this.offPeakEndHour);
}
public CompactSelection subList(int start, int end) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1414000&r1=1413999&r2=1414000&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Tue Nov 27 05:23:14 2012
@@ -208,10 +208,6 @@ public class CompactionRequest implement
return p;
}
- public long getSelectionTime() {
- return compactSelection.getSelectionTime();
- }
-
/** Gets the priority for the request */
public void setPriority(int p) {
this.p = p;
@@ -275,7 +271,7 @@ public class CompactionRequest implement
server.checkFileSystem();
} finally {
s.finishRequest(this);
- LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
+ LOG.debug("CompactSplitThread status: " + server.compactSplitThread);
}
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1414000&r1=1413999&r2=1414000&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Tue Nov 27 05:23:14 2012
@@ -49,7 +49,8 @@ import org.apache.hadoop.hbase.io.encodi
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.regionserver.compactions.*;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.experimental.categories.Category;
@@ -301,7 +302,6 @@ public class TestCompaction extends HBas
conf.setFloat("hbase.hregion.majorcompaction.jitter", jitterPct);
HStore s = ((HStore) r.getStore(COLUMN_FAMILY));
- s.compactionPolicy.updateConfiguration(conf, s);
try {
createStoreFile(r);
createStoreFile(r);
@@ -313,11 +313,9 @@ public class TestCompaction extends HBas
assertEquals(2, s.getStorefilesCount());
// ensure that major compaction time is deterministic
- CompactionPolicy c = s.compactionPolicy;
- List<StoreFile> storeFiles = s.getStorefiles();
- long mcTime = c.getNextMajorCompactTime(storeFiles);
+ long mcTime = s.getNextMajorCompactTime();
for (int i = 0; i < 10; ++i) {
- assertEquals(mcTime, c.getNextMajorCompactTime(storeFiles));
+ assertEquals(mcTime, s.getNextMajorCompactTime());
}
// ensure that the major compaction time is within the variance
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1414000&r1=1413999&r2=1414000&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Tue Nov 27 05:23:14 2012
@@ -248,15 +248,17 @@ public class TestStore extends TestCase
flush(i);
}
// after flush; check the lowest time stamp
- long lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
- long lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
- assertEquals(lowestTimeStampFromManager,lowestTimeStampFromFS);
-
+ long lowestTimeStampFromStore =
+ HStore.getLowestTimestamp(store.getStorefiles());
+ long lowestTimeStampFromFS =
+ getLowestTimeStampFromFS(fs,store.getStorefiles());
+ assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
+
// after compact; check the lowest time stamp
store.compact(store.requestCompaction());
- lowestTimeStampFromManager = StoreUtils.getLowestTimestamp(store.getStorefiles());
- lowestTimeStampFromFS = getLowestTimeStampFromFS(fs, store.getStorefiles());
- assertEquals(lowestTimeStampFromManager, lowestTimeStampFromFS);
+ lowestTimeStampFromStore = HStore.getLowestTimestamp(store.getStorefiles());
+ lowestTimeStampFromFS = getLowestTimeStampFromFS(fs,store.getStorefiles());
+ assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
}
private static long getLowestTimeStampFromFS(FileSystem fs,