You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/11/15 01:36:28 UTC
svn commit: r714200 - in /hadoop/hbase/trunk: ./
src/java/org/apache/hadoop/hbase/regionserver/
src/java/org/apache/hadoop/hbase/util/
Author: stack
Date: Fri Nov 14 16:36:27 2008
New Revision: 714200
URL: http://svn.apache.org/viewvc?rev=714200&view=rev
Log:
HBASE-938 major compaction period is not checked periodically
Modified:
hadoop/hbase/trunk/CHANGES.txt
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java
hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Threads.java
Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Fri Nov 14 16:36:27 2008
@@ -119,6 +119,7 @@
HBASE-998 Narrow getClosestRowBefore by passing column family
HBASE-999 Up versions on historian and keep history of deleted regions for a
while rather than delete immediately
+ HBASE-938 Major compaction period is not checked periodically
NEW FEATURES
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Fri Nov 14 16:36:27 2008
@@ -117,13 +117,16 @@
/**
* @param r HRegion store belongs to
+ * @param why Why compaction requested -- used in debug messages
*/
- public synchronized void compactionRequested(HRegion r) {
+ public synchronized void compactionRequested(final HRegion r,
+ final String why) {
if (this.server.stopRequested.get()) {
return;
}
- LOG.debug("Compaction requested for region: " +
- Bytes.toString(r.getRegionName()));
+ LOG.debug("Compaction requested for region " +
+ Bytes.toString(r.getRegionName()) +
+ (why != null && !why.isEmpty()? " because: " + why: ""));
synchronized (regionsInQueue) {
if (!regionsInQueue.contains(r)) {
compactionQueue.add(r);
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Nov 14 16:36:27 2008
@@ -1336,8 +1336,8 @@
try {
for (HStore store : stores.values()) {
List<HStoreKey> keys =
- store.getKeys(new HStoreKey(row, ts, this.regionInfo), ALL_VERSIONS,
- now, null);
+ store.getKeys(new HStoreKey(row, ts, this.regionInfo),
+ ALL_VERSIONS, now, null);
TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
new HStoreKey.HStoreKeyWritableComparator(regionInfo));
for (HStoreKey key: keys) {
@@ -1369,7 +1369,8 @@
long now = System.currentTimeMillis();
try {
for (HStore store : stores.values()) {
- List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp, this.regionInfo),
+ List<HStoreKey> keys =
+ store.getKeys(new HStoreKey(row, timestamp, this.regionInfo),
ALL_VERSIONS, now, columnPattern);
TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
new HStoreKey.HStoreKeyWritableComparator(regionInfo));
@@ -2400,6 +2401,19 @@
static void listFiles(FileSystem fs, HRegion r) throws IOException {
listPaths(fs, r.getRegionDir());
}
+
+ /**
+ * @return True if needs a mojor compaction.
+ * @throws IOException
+ */
+ boolean isMajorCompaction() throws IOException {
+ for (HStore store: this.stores.values()) {
+ if (store.isMajorCompaction()) {
+ return true;
+ }
+ }
+ return false;
+ }
/*
* List the files under the specified directory
@@ -2425,4 +2439,4 @@
}
}
}
-}
\ No newline at end of file
+}
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Nov 14 16:36:27 2008
@@ -52,6 +52,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@@ -174,32 +175,6 @@
private final LinkedList<byte[]> reservedSpace = new LinkedList<byte []>();
private RegionServerMetrics metrics;
-
- /**
- * Thread to shutdown the region server in an orderly manner. This thread
- * is registered as a shutdown hook in the HRegionServer constructor and is
- * only called when the HRegionServer receives a kill signal.
- */
- class ShutdownThread extends Thread {
- private final HRegionServer instance;
-
- /**
- * @param instance
- */
- public ShutdownThread(HRegionServer instance) {
- this.instance = instance;
- }
-
- @Override
- public void run() {
- LOG.info("Starting shutdown thread.");
-
- // tell the region server to stop and wait for it to complete
- instance.stop();
- instance.join();
- LOG.info("Shutdown thread complete");
- }
- }
// Compactions
final CompactSplitThread compactSplitThread;
@@ -207,6 +182,10 @@
// Cache flushing
final MemcacheFlusher cacheFlusher;
+ /* Check for major compactions.
+ */
+ final Chore majorCompactionChecker;
+
// HLog and HLog roller. log is protected rather than private to avoid
// eclipse warning when accessed by inner classes
protected volatile HLog log;
@@ -260,6 +239,13 @@
// Log flushing thread
this.logFlusher =
new LogFlusher(this.threadWakeFrequency, this.stopRequested);
+
+ // Background thread to check for major compactions; needed if region
+ // has not gotten updates in a while. Make it run at a lesser frequency.
+ int multiplier = this.conf.getInt(THREAD_WAKE_FREQUENCY +
+ ".multiplier", 1000);
+ this.majorCompactionChecker = new MajorCompactionChecker(this,
+ this.threadWakeFrequency * multiplier, this.stopRequested);
// Task thread to process requests from Master
this.worker = new Worker();
@@ -474,6 +460,7 @@
logFlusher.interrupt();
compactSplitThread.interruptIfNecessary();
logRoller.interruptIfNecessary();
+ this.majorCompactionChecker.interrupt();
if (abortRequested) {
if (this.fsOk) {
@@ -571,6 +558,66 @@
throw ex;
}
}
+
+ /*
+ * Thread to shutdown the region server in an orderly manner. This thread
+ * is registered as a shutdown hook in the HRegionServer constructor and is
+ * only called when the HRegionServer receives a kill signal.
+ */
+ private static class ShutdownThread extends Thread {
+ private final Log LOG = LogFactory.getLog(this.getClass());
+ private final HRegionServer instance;
+
+ /**
+ * @param instance
+ */
+ public ShutdownThread(HRegionServer instance) {
+ this.instance = instance;
+ }
+
+ @Override
+ public void run() {
+ LOG.info("Starting shutdown thread.");
+
+ // tell the region server to stop and wait for it to complete
+ instance.stop();
+ instance.join();
+ LOG.info("Shutdown thread complete");
+ }
+ }
+
+ /*
+ * Inner class that runs on a long period checking if regions need major
+ * compaction.
+ */
+ private static class MajorCompactionChecker extends Chore {
+ private final Log LOG = LogFactory.getLog(this.getClass());
+ private final HRegionServer instance;
+
+ MajorCompactionChecker(final HRegionServer h,
+ final int sleepTime, final AtomicBoolean stopper) {
+ super(sleepTime, stopper);
+ this.instance = h;
+ LOG.info("Runs every " + sleepTime + "ms");
+ }
+
+ @Override
+ protected void chore() {
+ Set<Integer> keys = this.instance.onlineRegions.keySet();
+ for (Integer i: keys) {
+ HRegion r = this.instance.onlineRegions.get(i);
+ try {
+ if (r != null && r.isMajorCompaction()) {
+ // Queue a compaction. Will recognize if major is needed.
+ this.instance.compactSplitThread.
+ compactionRequested(r, getName() + " requests major compaction");
+ }
+ } catch (IOException e) {
+ LOG.warn("Failed major compaction check on " + r, e);
+ }
+ }
+ }
+ };
/**
* Report the status of the server. A server is online once all the startup
@@ -660,6 +707,9 @@
Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
handler);
Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
+ Threads.setDaemonThreadRunning(this.majorCompactionChecker,
+ n + ".majorCompactionChecker", handler);
+
// Leases is not a Thread. Internally it runs a daemon thread. If it gets
// an unhandled exception, it will just exit.
this.leases.setName(n + ".leaseChecker");
@@ -690,7 +740,7 @@
// Verify that all threads are alive
if (!(leases.isAlive() && compactSplitThread.isAlive() &&
cacheFlusher.isAlive() && logRoller.isAlive() &&
- workerThread.isAlive())) {
+ workerThread.isAlive() && this.majorCompactionChecker.isAlive())) {
// One or more threads are no longer alive - shut down
stop();
return false;
@@ -750,20 +800,11 @@
* Presumption is that all closes and stops have already been called.
*/
void join() {
- join(this.workerThread);
- join(this.cacheFlusher);
- join(this.compactSplitThread);
- join(this.logRoller);
- }
-
- private void join(final Thread t) {
- while (t.isAlive()) {
- try {
- t.join();
- } catch (InterruptedException e) {
- // continue
- }
- }
+ Threads.shutdown(this.majorCompactionChecker);
+ Threads.shutdown(this.workerThread);
+ Threads.shutdown(this.cacheFlusher);
+ Threads.shutdown(this.compactSplitThread);
+ Threads.shutdown(this.logRoller);
}
/*
@@ -925,13 +966,15 @@
// Force split a region
HRegion region = getRegion(info.getRegionName());
region.regionInfo.shouldSplit(true);
- compactSplitThread.compactionRequested(region);
+ compactSplitThread.compactionRequested(region,
+ "MSG_REGION_SPLIT");
} break;
case MSG_REGION_COMPACT: {
// Compact a region
HRegion region = getRegion(info.getRegionName());
- compactSplitThread.compactionRequested(region);
+ compactSplitThread.compactionRequested(region,
+ "MSG_REGION_COMPACT");
} break;
default:
@@ -983,7 +1026,8 @@
try {
region = instantiateRegion(regionInfo);
// Startup a compaction early if one is needed.
- this.compactSplitThread.compactionRequested(region);
+ this.compactSplitThread.
+ compactionRequested(region, "Region open check");
} catch (IOException e) {
LOG.error("error opening region " + regionInfo.getRegionNameAsString(), e);
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Fri Nov 14 16:36:27 2008
@@ -435,6 +435,7 @@
curfile = new HStoreFile(conf, fs, basedir, this.info,
family.getName(), fid, reference);
long storeSeqId = -1;
+ boolean majorCompaction = false;
try {
storeSeqId = curfile.loadInfo(fs);
if (storeSeqId > this.maxSeqId) {
@@ -488,7 +489,9 @@
// Found map and sympathetic info file. Add this hstorefile to result.
if (LOG.isDebugEnabled()) {
LOG.debug("loaded " + FSUtils.getPath(p) + ", isReference=" +
- isReference + ", sequence id=" + storeSeqId + ", length=" + length);
+ isReference + ", sequence id=" + storeSeqId +
+ ", length=" + length + ", majorCompaction=" +
+ curfile.isMajorCompaction());
}
results.put(Long.valueOf(storeSeqId), curfile);
// Keep list of sympathetic data mapfiles for cleaning info dir in next
@@ -691,7 +694,8 @@
" with " + entries +
" entries, sequence id " + logCacheFlushId + ", data size " +
StringUtils.humanReadableInt(flushed) + ", file size " +
- StringUtils.humanReadableInt(newStoreSize));
+ StringUtils.humanReadableInt(newStoreSize) + " to " +
+ this.info.getRegionNameAsString());
}
}
return storefiles.size() >= compactionThreshold;
@@ -832,11 +836,11 @@
// Check to see if we need to do a major compaction on this region.
// If so, change doMajorCompaction to true to skip the incremental
// compacting below. Only check if doMajorCompaction is not true.
- long lastMajorCompaction = 0L;
if (!doMajorCompaction) {
- doMajorCompaction = isMajorCompaction();
+ doMajorCompaction = isMajorCompaction(filesToCompact);
}
- if (!doMajorCompaction && !hasReferences(filesToCompact) &&
+ boolean references = hasReferences(filesToCompact);
+ if (!doMajorCompaction && !references &&
filesToCompact.size() < compactionThreshold) {
return checkSplit(forceSplit);
}
@@ -862,7 +866,7 @@
fileSizes[i] = len;
totalSize += len;
}
- if (!doMajorCompaction && !hasReferences(filesToCompact)) {
+ if (!doMajorCompaction && !references) {
// Here we select files for incremental compaction.
// The rule is: if the largest(oldest) one is more than twice the
// size of the second, skip the largest, and continue to next...,
@@ -888,7 +892,7 @@
if (LOG.isDebugEnabled()) {
LOG.debug("Compaction size of " + this.storeNameStr + ": " +
StringUtils.humanReadableInt(totalSize) + "; Skipped " + point +
- " files , size: " + skipped);
+ " file(s), size: " + skipped);
}
}
@@ -904,7 +908,8 @@
HStoreFile compactedOutputFile = new HStoreFile(conf, fs,
this.compactionDir, this.info, family.getName(), -1L, null);
if (LOG.isDebugEnabled()) {
- LOG.debug("started compaction of " + rdrs.size() + " files into " +
+ LOG.debug("Started compaction of " + rdrs.size() + " file(s)" +
+ (references? "(hasReferences=true)": " ") + " into " +
FSUtils.getPath(compactedOutputFile.getMapFilePath()));
}
MapFile.Writer writer = compactedOutputFile.getWriter(this.fs,
@@ -917,15 +922,14 @@
}
// Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
- compactedOutputFile.writeInfo(fs, maxId);
+ compactedOutputFile.writeInfo(fs, maxId, doMajorCompaction);
// Move the compaction into place.
completeCompaction(filesToCompact, compactedOutputFile);
if (LOG.isDebugEnabled()) {
- LOG.debug("Completed compaction of " + this.storeNameStr +
- " store size is " + StringUtils.humanReadableInt(storeSize) +
- (doMajorCompaction? "": "; time since last major compaction: " +
- (lastMajorCompaction/1000) + " seconds"));
+ LOG.debug("Completed " + (doMajorCompaction? "major": "") +
+ " compaction of " + this.storeNameStr +
+ " store size is " + StringUtils.humanReadableInt(storeSize));
}
}
return checkSplit(forceSplit);
@@ -955,19 +959,40 @@
/*
* @return True if we should run a major compaction.
*/
- private boolean isMajorCompaction() throws IOException {
+ boolean isMajorCompaction() throws IOException {
+ return isMajorCompaction(null);
+ }
+
+ /*
+ * @param filesToCompact Files to compact. Can be null.
+ * @return True if we should run a major compaction.
+ */
+ private boolean isMajorCompaction(final List<HStoreFile> filesToCompact)
+ throws IOException {
boolean result = false;
Path mapdir = HStoreFile.getMapDir(this.basedir, this.info.getEncodedName(),
this.family.getName());
long lowTimestamp = getLowestTimestamp(fs, mapdir);
if (lowTimestamp < (System.currentTimeMillis() - this.majorCompactionTime) &&
lowTimestamp > 0l) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Major compaction triggered on store: " +
- this.storeNameStr + ". Time since last major compaction: " +
- ((System.currentTimeMillis() - lowTimestamp)/1000) + " seconds");
+ // Major compaction time has elapsed.
+ long elapsedTime = System.currentTimeMillis() - lowTimestamp;
+ if (filesToCompact != null && filesToCompact.size() == 1 &&
+ filesToCompact.get(0).isMajorCompaction() &&
+ (this.ttl == HConstants.FOREVER || elapsedTime < this.ttl)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping major compaction because only one (major) " +
+ "compacted file only and elapsedTime " + elapsedTime +
+ " is < ttl=" + this.ttl);
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Major compaction triggered on store: " +
+ this.storeNameStr + ". Time since last major compaction: " +
+ ((System.currentTimeMillis() - lowTimestamp)/1000) + " seconds");
+ }
+ result = true;
}
- result = true;
}
return result;
}
@@ -1160,7 +1185,8 @@
try {
// 1. Moving the new MapFile into place.
HStoreFile finalCompactedFile = new HStoreFile(conf, fs, basedir,
- this.info, family.getName(), -1, null);
+ this.info, family.getName(), -1, null,
+ compactedFile.isMajorCompaction());
if (LOG.isDebugEnabled()) {
LOG.debug("moving " + FSUtils.getPath(compactedFile.getMapFilePath()) +
" to " + FSUtils.getPath(finalCompactedFile.getMapFilePath()));
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java Fri Nov 14 16:36:27 2008
@@ -83,6 +83,7 @@
public class HStoreFile implements HConstants {
static final Log LOG = LogFactory.getLog(HStoreFile.class.getName());
static final byte INFO_SEQ_NUM = 0;
+ static final byte MAJOR_COMPACTION = INFO_SEQ_NUM + 1;
static final String HSTORE_DATFILE_DIR = "mapfiles";
static final String HSTORE_INFO_DIR = "info";
static final String HSTORE_FILTER_DIR = "filter";
@@ -97,6 +98,9 @@
private final FileSystem fs;
private final Reference reference;
private final HRegionInfo hri;
+ /* If true, this file was product of a major compaction.
+ */
+ private boolean majorCompaction = false;
/**
* Constructor that fully initializes the object
@@ -112,6 +116,24 @@
final HRegionInfo hri, byte [] colFamily, long fileId,
final Reference ref)
throws IOException {
+ this(conf, fs, basedir, hri, colFamily, fileId, ref, false);
+ }
+
+ /**
+ * Constructor that fully initializes the object
+ * @param conf Configuration object
+ * @param basedir qualified path that is parent of region directory
+ * @param colFamily name of the column family
+ * @param fileId file identifier
+ * @param ref Reference to another HStoreFile.
+ * @param hri The region info for this file (HACK HBASE-868). TODO: Fix.
+ * @param mc Try if this file was result of a major compression.
+ * @throws IOException
+ */
+ HStoreFile(HBaseConfiguration conf, FileSystem fs, Path basedir,
+ final HRegionInfo hri, byte [] colFamily, long fileId,
+ final Reference ref, final boolean mc)
+ throws IOException {
this.conf = conf;
this.fs = fs;
this.basedir = basedir;
@@ -133,6 +155,7 @@
// If a reference, construction does not write the pointer files. Thats
// done by invocations of writeReferenceFiles(hsf, fs). Happens at split.
this.reference = ref;
+ this.majorCompaction = mc;
}
/** @return the region name */
@@ -288,11 +311,11 @@
/**
* Reads in an info file
*
- * @param fs file system
+ * @param filesystem file system
* @return The sequence id contained in the info file
* @throws IOException
*/
- long loadInfo(FileSystem fs) throws IOException {
+ long loadInfo(final FileSystem filesystem) throws IOException {
Path p = null;
if (isReference()) {
p = getInfoFilePath(reference.getEncodedRegionName(),
@@ -300,10 +323,18 @@
} else {
p = getInfoFilePath();
}
- DataInputStream in = new DataInputStream(fs.open(p));
+ long length = filesystem.getFileStatus(p).getLen();
+ boolean hasMoreThanSeqNum = length > (Byte.SIZE + Bytes.SIZEOF_LONG);
+ DataInputStream in = new DataInputStream(filesystem.open(p));
try {
byte flag = in.readByte();
- if(flag == INFO_SEQ_NUM) {
+ if (flag == INFO_SEQ_NUM) {
+ if (hasMoreThanSeqNum) {
+ flag = in.readByte();
+ if (flag == MAJOR_COMPACTION) {
+ this.majorCompaction = in.readBoolean();
+ }
+ }
return in.readLong();
}
throw new IOException("Cannot process log file: " + p);
@@ -315,16 +346,37 @@
/**
* Writes the file-identifier to disk
*
- * @param fs file system
+ * @param filesystem file system
* @param infonum file id
* @throws IOException
*/
- void writeInfo(FileSystem fs, long infonum) throws IOException {
+ void writeInfo(final FileSystem filesystem, final long infonum)
+ throws IOException {
+ writeInfo(filesystem, infonum, false);
+ }
+
+ /**
+ * Writes the file-identifier to disk
+ *
+ * @param filesystem file system
+ * @param infonum file id
+ * @param mc True if this file is product of a major compaction
+ * @throws IOException
+ */
+ void writeInfo(final FileSystem filesystem, final long infonum,
+ final boolean mc)
+ throws IOException {
Path p = getInfoFilePath();
- FSDataOutputStream out = fs.create(p);
+ FSDataOutputStream out = filesystem.create(p);
try {
out.writeByte(INFO_SEQ_NUM);
out.writeLong(infonum);
+ if (mc) {
+ // Set whether major compaction flag on this file.
+ this.majorCompaction = mc;
+ out.writeByte(MAJOR_COMPACTION);
+ out.writeBoolean(mc);
+ }
} finally {
out.close();
}
@@ -430,6 +482,13 @@
return encodedRegionName + "/" + Bytes.toString(colFamily) + "/" + fileId +
(isReference()? "-" + reference.toString(): "");
}
+
+ /**
+ * @return True if this file was made by a major compaction.
+ */
+ public boolean isMajorCompaction() {
+ return this.majorCompaction;
+ }
private static String createHStoreFilename(final long fid,
final int encodedRegionName) {
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java Fri Nov 14 16:36:27 2008
@@ -177,7 +177,7 @@
// compact if removeFromQueue is true. Note that region.flushCache()
// only returns true if a flush is done and if a compaction is needed.
if (region.flushcache() && !removeFromQueue) {
- server.compactSplitThread.compactionRequested(region);
+ server.compactSplitThread.compactionRequested(region, getName());
}
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Threads.java?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Threads.java Fri Nov 14 16:36:27 2008
@@ -57,4 +57,18 @@
t.start();
return t;
}
-}
\ No newline at end of file
+
+ /**
+ * Shutdown passed thread using isAlive and join.
+ * @param t Thread to shutdown
+ */
+ public static void shutdown(final Thread t) {
+ while (t.isAlive()) {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ LOG.warn(t.getName(), e);
+ }
+ }
+ }
+}