You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2008/11/15 01:36:28 UTC

svn commit: r714200 - in /hadoop/hbase/trunk: ./ src/java/org/apache/hadoop/hbase/regionserver/ src/java/org/apache/hadoop/hbase/util/

Author: stack
Date: Fri Nov 14 16:36:27 2008
New Revision: 714200

URL: http://svn.apache.org/viewvc?rev=714200&view=rev
Log:
HBASE-938 major compaction period is not checked periodically

Modified:
    hadoop/hbase/trunk/CHANGES.txt
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java
    hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Threads.java

Modified: hadoop/hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/CHANGES.txt?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/CHANGES.txt (original)
+++ hadoop/hbase/trunk/CHANGES.txt Fri Nov 14 16:36:27 2008
@@ -119,6 +119,7 @@
    HBASE-998   Narrow getClosestRowBefore by passing column family
    HBASE-999   Up versions on historian and keep history of deleted regions for a
                while rather than delete immediately
+   HBASE-938   Major compaction period is not checked periodically
         
   NEW FEATURES
    HBASE-875   Use MurmurHash instead of JenkinsHash [in bloomfilters]

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Fri Nov 14 16:36:27 2008
@@ -117,13 +117,16 @@
   
   /**
    * @param r HRegion store belongs to
+   * @param why Why compaction requested -- used in debug messages
    */
-  public synchronized void compactionRequested(HRegion r) {
+  public synchronized void compactionRequested(final HRegion r,
+      final String why) {
     if (this.server.stopRequested.get()) {
       return;
     }
-    LOG.debug("Compaction requested for region: " +
-      Bytes.toString(r.getRegionName()));
+    LOG.debug("Compaction requested for region " +
+      Bytes.toString(r.getRegionName()) +
+      (why != null && !why.isEmpty()? " because: " + why: ""));
     synchronized (regionsInQueue) {
       if (!regionsInQueue.contains(r)) {
         compactionQueue.add(r);

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Nov 14 16:36:27 2008
@@ -1336,8 +1336,8 @@
     try {
       for (HStore store : stores.values()) {
         List<HStoreKey> keys =
-          store.getKeys(new HStoreKey(row, ts, this.regionInfo), ALL_VERSIONS,
-            now, null);
+          store.getKeys(new HStoreKey(row, ts, this.regionInfo),
+            ALL_VERSIONS, now, null);
         TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
           new HStoreKey.HStoreKeyWritableComparator(regionInfo));
         for (HStoreKey key: keys) {
@@ -1369,7 +1369,8 @@
     long now = System.currentTimeMillis();
     try {
       for (HStore store : stores.values()) {
-        List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp, this.regionInfo),
+        List<HStoreKey> keys =
+          store.getKeys(new HStoreKey(row, timestamp, this.regionInfo),
             ALL_VERSIONS, now, columnPattern);
           TreeMap<HStoreKey, byte []> edits = new TreeMap<HStoreKey, byte []>(
             new HStoreKey.HStoreKeyWritableComparator(regionInfo));
@@ -2400,6 +2401,19 @@
   static void listFiles(FileSystem fs, HRegion r) throws IOException {
     listPaths(fs, r.getRegionDir());
   }
+
+  /**
+   * @return True if needs a mojor compaction.
+   * @throws IOException 
+   */
+  boolean isMajorCompaction() throws IOException {
+    for (HStore store: this.stores.values()) {
+      if (store.isMajorCompaction()) {
+        return true;
+      }
+    }
+    return false;
+  }
   
   /*
    * List the files under the specified directory
@@ -2425,4 +2439,4 @@
       }
     }
   }
-}
\ No newline at end of file
+}

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Fri Nov 14 16:36:27 2008
@@ -52,6 +52,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Chore;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -174,32 +175,6 @@
   private final LinkedList<byte[]> reservedSpace = new LinkedList<byte []>();
   
   private RegionServerMetrics metrics;
-  
-  /**
-   * Thread to shutdown the region server in an orderly manner.  This thread
-   * is registered as a shutdown hook in the HRegionServer constructor and is
-   * only called when the HRegionServer receives a kill signal.
-   */
-  class ShutdownThread extends Thread {
-    private final HRegionServer instance;
-    
-    /**
-     * @param instance
-     */
-    public ShutdownThread(HRegionServer instance) {
-      this.instance = instance;
-    }
-
-    @Override
-    public void run() {
-      LOG.info("Starting shutdown thread.");
-      
-      // tell the region server to stop and wait for it to complete
-      instance.stop();
-      instance.join();
-      LOG.info("Shutdown thread complete");
-    }    
-  }
 
   // Compactions
   final CompactSplitThread compactSplitThread;
@@ -207,6 +182,10 @@
   // Cache flushing  
   final MemcacheFlusher cacheFlusher;
   
+  /* Check for major compactions.
+   */
+  final Chore majorCompactionChecker;
+
   // HLog and HLog roller.  log is protected rather than private to avoid
   // eclipse warning when accessed by inner classes
   protected volatile HLog log;
@@ -260,6 +239,13 @@
     // Log flushing thread
     this.logFlusher =
       new LogFlusher(this.threadWakeFrequency, this.stopRequested);
+    
+    // Background thread to check for major compactions; needed if region
+    // has not gotten updates in a while.  Make it run at a lesser frequency.
+    int multiplier = this.conf.getInt(THREAD_WAKE_FREQUENCY +
+        ".multiplier", 1000);
+    this.majorCompactionChecker = new MajorCompactionChecker(this,
+      this.threadWakeFrequency * multiplier,  this.stopRequested);
 
     // Task thread to process requests from Master
     this.worker = new Worker();
@@ -474,6 +460,7 @@
     logFlusher.interrupt();
     compactSplitThread.interruptIfNecessary();
     logRoller.interruptIfNecessary();
+    this.majorCompactionChecker.interrupt();
 
     if (abortRequested) {
       if (this.fsOk) {
@@ -571,6 +558,66 @@
       throw ex;
     }
   }
+
+  /*
+   * Thread to shutdown the region server in an orderly manner.  This thread
+   * is registered as a shutdown hook in the HRegionServer constructor and is
+   * only called when the HRegionServer receives a kill signal.
+   */
+  private static class ShutdownThread extends Thread {
+    private final Log LOG = LogFactory.getLog(this.getClass());
+    private final HRegionServer instance;
+    
+    /**
+     * @param instance
+     */
+    public ShutdownThread(HRegionServer instance) {
+      this.instance = instance;
+    }
+
+    @Override
+    public void run() {
+      LOG.info("Starting shutdown thread.");
+      
+      // tell the region server to stop and wait for it to complete
+      instance.stop();
+      instance.join();
+      LOG.info("Shutdown thread complete");
+    }    
+  }
+
+  /*
+   * Inner class that runs on a long period checking if regions need major
+   * compaction.
+   */
+  private static class MajorCompactionChecker extends Chore {
+    private final Log LOG = LogFactory.getLog(this.getClass());
+    private final HRegionServer instance;
+    
+    MajorCompactionChecker(final HRegionServer h,
+        final int sleepTime, final AtomicBoolean stopper) {
+      super(sleepTime, stopper);
+      this.instance = h;
+      LOG.info("Runs every " + sleepTime + "ms");
+    }
+
+    @Override
+    protected void chore() {
+      Set<Integer> keys = this.instance.onlineRegions.keySet();
+      for (Integer i: keys) {
+        HRegion r = this.instance.onlineRegions.get(i);
+        try {
+          if (r != null && r.isMajorCompaction()) {
+            // Queue a compaction.  Will recognize if major is needed.
+            this.instance.compactSplitThread.
+              compactionRequested(r, getName() + " requests major compaction");
+          }
+        } catch (IOException e) {
+          LOG.warn("Failed major compaction check on " + r, e);
+        }
+      }
+    }
+  };
   
   /**
    * Report the status of the server. A server is online once all the startup 
@@ -660,6 +707,9 @@
     Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
         handler);
     Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
+    Threads.setDaemonThreadRunning(this.majorCompactionChecker,
+        n + ".majorCompactionChecker", handler);
+    
     // Leases is not a Thread. Internally it runs a daemon thread.  If it gets
     // an unhandled exception, it will just exit.
     this.leases.setName(n + ".leaseChecker");
@@ -690,7 +740,7 @@
     // Verify that all threads are alive
     if (!(leases.isAlive() && compactSplitThread.isAlive() &&
         cacheFlusher.isAlive() && logRoller.isAlive() &&
-        workerThread.isAlive())) {
+        workerThread.isAlive() && this.majorCompactionChecker.isAlive())) {
       // One or more threads are no longer alive - shut down
       stop();
       return false;
@@ -750,20 +800,11 @@
    * Presumption is that all closes and stops have already been called.
    */
   void join() {
-    join(this.workerThread);
-    join(this.cacheFlusher);
-    join(this.compactSplitThread);
-    join(this.logRoller);
-  }
-
-  private void join(final Thread t) {
-    while (t.isAlive()) {
-      try {
-        t.join();
-      } catch (InterruptedException e) {
-        // continue
-      }
-    }
+    Threads.shutdown(this.majorCompactionChecker);
+    Threads.shutdown(this.workerThread);
+    Threads.shutdown(this.cacheFlusher);
+    Threads.shutdown(this.compactSplitThread);
+    Threads.shutdown(this.logRoller);
   }
   
   /*
@@ -925,13 +966,15 @@
               // Force split a region
               HRegion region = getRegion(info.getRegionName());
               region.regionInfo.shouldSplit(true);
-              compactSplitThread.compactionRequested(region);
+              compactSplitThread.compactionRequested(region,
+                "MSG_REGION_SPLIT");
             } break;
 
             case MSG_REGION_COMPACT: {
               // Compact a region
               HRegion region = getRegion(info.getRegionName());
-              compactSplitThread.compactionRequested(region);
+              compactSplitThread.compactionRequested(region,
+                "MSG_REGION_COMPACT");
             } break;
 
             default:
@@ -983,7 +1026,8 @@
       try {
         region = instantiateRegion(regionInfo);
         // Startup a compaction early if one is needed.
-        this.compactSplitThread.compactionRequested(region);
+        this.compactSplitThread.
+          compactionRequested(region, "Region open check");
       } catch (IOException e) {
         LOG.error("error opening region " + regionInfo.getRegionNameAsString(), e);
 

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStore.java Fri Nov 14 16:36:27 2008
@@ -435,6 +435,7 @@
       curfile = new HStoreFile(conf, fs, basedir, this.info,
         family.getName(), fid, reference);
       long storeSeqId = -1;
+      boolean majorCompaction = false;
       try {
         storeSeqId = curfile.loadInfo(fs);
         if (storeSeqId > this.maxSeqId) {
@@ -488,7 +489,9 @@
       // Found map and sympathetic info file.  Add this hstorefile to result.
       if (LOG.isDebugEnabled()) {
         LOG.debug("loaded " + FSUtils.getPath(p) + ", isReference=" +
-          isReference + ", sequence id=" + storeSeqId + ", length=" + length);
+          isReference + ", sequence id=" + storeSeqId +
+          ", length=" + length + ", majorCompaction=" +
+          curfile.isMajorCompaction());
       }
       results.put(Long.valueOf(storeSeqId), curfile);
       // Keep list of sympathetic data mapfiles for cleaning info dir in next
@@ -691,7 +694,8 @@
           " with " + entries +
           " entries, sequence id " + logCacheFlushId + ", data size " +
           StringUtils.humanReadableInt(flushed) + ", file size " +
-          StringUtils.humanReadableInt(newStoreSize));
+          StringUtils.humanReadableInt(newStoreSize) + " to " +
+          this.info.getRegionNameAsString());
       }
     }
     return storefiles.size() >= compactionThreshold;
@@ -832,11 +836,11 @@
       // Check to see if we need to do a major compaction on this region.
       // If so, change doMajorCompaction to true to skip the incremental
       // compacting below. Only check if doMajorCompaction is not true.
-      long lastMajorCompaction = 0L;
       if (!doMajorCompaction) {
-        doMajorCompaction = isMajorCompaction();
+        doMajorCompaction = isMajorCompaction(filesToCompact);
       }
-      if (!doMajorCompaction && !hasReferences(filesToCompact) &&
+      boolean references = hasReferences(filesToCompact);
+      if (!doMajorCompaction && !references &&
           filesToCompact.size() < compactionThreshold) {
         return checkSplit(forceSplit);
       }
@@ -862,7 +866,7 @@
         fileSizes[i] = len;
         totalSize += len;
       }
-      if (!doMajorCompaction && !hasReferences(filesToCompact)) {
+      if (!doMajorCompaction && !references) {
         // Here we select files for incremental compaction.  
         // The rule is: if the largest(oldest) one is more than twice the 
         // size of the second, skip the largest, and continue to next...,
@@ -888,7 +892,7 @@
         if (LOG.isDebugEnabled()) {
           LOG.debug("Compaction size of " + this.storeNameStr + ": " +
             StringUtils.humanReadableInt(totalSize) + "; Skipped " + point +
-            " files , size: " + skipped);
+            " file(s), size: " + skipped);
         }
       }
 
@@ -904,7 +908,8 @@
       HStoreFile compactedOutputFile = new HStoreFile(conf, fs, 
           this.compactionDir,  this.info, family.getName(), -1L, null);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("started compaction of " + rdrs.size() + " files into " +
+        LOG.debug("Started compaction of " + rdrs.size() + " file(s)" +
+          (references? "(hasReferences=true)": " ") + " into " +
           FSUtils.getPath(compactedOutputFile.getMapFilePath()));
       }
       MapFile.Writer writer = compactedOutputFile.getWriter(this.fs,
@@ -917,15 +922,14 @@
       }
 
       // Now, write out an HSTORE_LOGINFOFILE for the brand-new TreeMap.
-      compactedOutputFile.writeInfo(fs, maxId);
+      compactedOutputFile.writeInfo(fs, maxId, doMajorCompaction);
 
       // Move the compaction into place.
       completeCompaction(filesToCompact, compactedOutputFile);
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Completed compaction of " + this.storeNameStr +
-          " store size is " + StringUtils.humanReadableInt(storeSize) +
-          (doMajorCompaction? "": "; time since last major compaction: " +
-          (lastMajorCompaction/1000) + " seconds"));
+        LOG.debug("Completed " + (doMajorCompaction? "major": "") +
+          " compaction of " + this.storeNameStr +
+          " store size is " + StringUtils.humanReadableInt(storeSize));
       }
     }
     return checkSplit(forceSplit);
@@ -955,19 +959,40 @@
   /*
    * @return True if we should run a major compaction.
    */
-  private boolean isMajorCompaction() throws IOException {
+  boolean isMajorCompaction() throws IOException {
+    return isMajorCompaction(null);
+  }
+
+  /*
+   * @param filesToCompact Files to compact. Can be null.
+   * @return True if we should run a major compaction.
+   */
+  private boolean isMajorCompaction(final List<HStoreFile> filesToCompact)
+  throws IOException {
     boolean result = false;
     Path mapdir = HStoreFile.getMapDir(this.basedir, this.info.getEncodedName(),
         this.family.getName());
     long lowTimestamp = getLowestTimestamp(fs, mapdir);
     if (lowTimestamp < (System.currentTimeMillis() - this.majorCompactionTime) &&
         lowTimestamp > 0l) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Major compaction triggered on store: " +
-          this.storeNameStr + ". Time since last major compaction: " +
-          ((System.currentTimeMillis() - lowTimestamp)/1000) + " seconds");
+      // Major compaction time has elapsed.
+      long elapsedTime = System.currentTimeMillis() - lowTimestamp;
+      if (filesToCompact != null && filesToCompact.size() == 1 &&
+          filesToCompact.get(0).isMajorCompaction() &&
+          (this.ttl == HConstants.FOREVER || elapsedTime < this.ttl)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skipping major compaction because only one (major) " +
+            "compacted file only and elapsedTime " + elapsedTime +
+            " is < ttl=" + this.ttl);
+        }
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Major compaction triggered on store: " +
+              this.storeNameStr + ". Time since last major compaction: " +
+              ((System.currentTimeMillis() - lowTimestamp)/1000) + " seconds");
+        }
+        result = true;
       }
-      result = true;
     }
     return result;
   }
@@ -1160,7 +1185,8 @@
     try {
       // 1. Moving the new MapFile into place.
       HStoreFile finalCompactedFile = new HStoreFile(conf, fs, basedir,
-        this.info, family.getName(), -1, null);
+        this.info, family.getName(), -1, null,
+        compactedFile.isMajorCompaction());
       if (LOG.isDebugEnabled()) {
         LOG.debug("moving " + FSUtils.getPath(compactedFile.getMapFilePath()) +
           " to " + FSUtils.getPath(finalCompactedFile.getMapFilePath()));

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/HStoreFile.java Fri Nov 14 16:36:27 2008
@@ -83,6 +83,7 @@
 public class HStoreFile implements HConstants {
   static final Log LOG = LogFactory.getLog(HStoreFile.class.getName());
   static final byte INFO_SEQ_NUM = 0;
+  static final byte MAJOR_COMPACTION = INFO_SEQ_NUM + 1;
   static final String HSTORE_DATFILE_DIR = "mapfiles";
   static final String HSTORE_INFO_DIR = "info";
   static final String HSTORE_FILTER_DIR = "filter";
@@ -97,6 +98,9 @@
   private final FileSystem fs;
   private final Reference reference;
   private final HRegionInfo hri;
+  /* If true, this file was product of a major compaction.
+   */
+  private boolean majorCompaction = false;
 
   /**
    * Constructor that fully initializes the object
@@ -112,6 +116,24 @@
       final HRegionInfo hri, byte [] colFamily, long fileId,
       final Reference ref)
   throws IOException {
+    this(conf, fs, basedir, hri, colFamily, fileId, ref, false);
+  }
+  
+  /**
+   * Constructor that fully initializes the object
+   * @param conf Configuration object
+   * @param basedir qualified path that is parent of region directory
+   * @param colFamily name of the column family
+   * @param fileId file identifier
+   * @param ref Reference to another HStoreFile.
+   * @param hri The region info for this file (HACK HBASE-868). TODO: Fix.
+   * @param mc Try if this file was result of a major compression.
+   * @throws IOException
+   */
+  HStoreFile(HBaseConfiguration conf, FileSystem fs, Path basedir,
+      final HRegionInfo hri, byte [] colFamily, long fileId,
+      final Reference ref, final boolean mc)
+  throws IOException {
     this.conf = conf;
     this.fs = fs;
     this.basedir = basedir;
@@ -133,6 +155,7 @@
     // If a reference, construction does not write the pointer files.  Thats
     // done by invocations of writeReferenceFiles(hsf, fs). Happens at split.
     this.reference = ref;
+    this.majorCompaction = mc;
   }
 
   /** @return the region name */
@@ -288,11 +311,11 @@
   /** 
    * Reads in an info file
    *
-   * @param fs file system
+   * @param filesystem file system
    * @return The sequence id contained in the info file
    * @throws IOException
    */
-  long loadInfo(FileSystem fs) throws IOException {
+  long loadInfo(final FileSystem filesystem) throws IOException {
     Path p = null;
     if (isReference()) {
       p = getInfoFilePath(reference.getEncodedRegionName(),
@@ -300,10 +323,18 @@
     } else {
       p = getInfoFilePath();
     }
-    DataInputStream in = new DataInputStream(fs.open(p));
+    long length = filesystem.getFileStatus(p).getLen();
+    boolean hasMoreThanSeqNum = length > (Byte.SIZE + Bytes.SIZEOF_LONG);
+    DataInputStream in = new DataInputStream(filesystem.open(p));
     try {
       byte flag = in.readByte();
-      if(flag == INFO_SEQ_NUM) {
+      if (flag == INFO_SEQ_NUM) {
+        if (hasMoreThanSeqNum) {
+          flag = in.readByte();
+          if (flag == MAJOR_COMPACTION) {
+            this.majorCompaction = in.readBoolean();
+          }
+        }
         return in.readLong();
       }
       throw new IOException("Cannot process log file: " + p);
@@ -315,16 +346,37 @@
   /**
    * Writes the file-identifier to disk
    * 
-   * @param fs file system
+   * @param filesystem file system
    * @param infonum file id
    * @throws IOException
    */
-  void writeInfo(FileSystem fs, long infonum) throws IOException {
+  void writeInfo(final FileSystem filesystem, final long infonum)
+  throws IOException {
+    writeInfo(filesystem, infonum, false);
+  }
+  
+  /**
+   * Writes the file-identifier to disk
+   * 
+   * @param filesystem file system
+   * @param infonum file id
+   * @param mc True if this file is product of a major compaction
+   * @throws IOException
+   */
+  void writeInfo(final FileSystem filesystem, final long infonum,
+    final boolean mc)
+  throws IOException {
     Path p = getInfoFilePath();
-    FSDataOutputStream out = fs.create(p);
+    FSDataOutputStream out = filesystem.create(p);
     try {
       out.writeByte(INFO_SEQ_NUM);
       out.writeLong(infonum);
+      if (mc) {
+        // Set whether major compaction flag on this file.
+        this.majorCompaction = mc;
+        out.writeByte(MAJOR_COMPACTION);
+        out.writeBoolean(mc);
+      }
     } finally {
       out.close();
     }
@@ -430,6 +482,13 @@
     return encodedRegionName + "/" + Bytes.toString(colFamily) + "/" + fileId +
       (isReference()? "-" + reference.toString(): "");
   }
+  
+  /**
+   * @return True if this file was made by a major compaction.
+   */
+  public boolean isMajorCompaction() {
+    return this.majorCompaction;
+  }
 
   private static String createHStoreFilename(final long fid,
       final int encodedRegionName) {

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/regionserver/MemcacheFlusher.java Fri Nov 14 16:36:27 2008
@@ -177,7 +177,7 @@
       // compact if removeFromQueue is true. Note that region.flushCache()
       // only returns true if a flush is done and if a compaction is needed.
       if (region.flushcache() && !removeFromQueue) {
-        server.compactSplitThread.compactionRequested(region);
+        server.compactSplitThread.compactionRequested(region, getName());
       }
     } catch (DroppedSnapshotException ex) {
       // Cache flush can fail in a few places. If it fails in a critical

Modified: hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Threads.java
URL: http://svn.apache.org/viewvc/hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Threads.java?rev=714200&r1=714199&r2=714200&view=diff
==============================================================================
--- hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Threads.java (original)
+++ hadoop/hbase/trunk/src/java/org/apache/hadoop/hbase/util/Threads.java Fri Nov 14 16:36:27 2008
@@ -57,4 +57,18 @@
     t.start();
     return t;
   }
-}
\ No newline at end of file
+
+  /**
+   * Shutdown passed thread using isAlive and join.
+   * @param t Thread to shutdown
+   */
+  public static void shutdown(final Thread t) {
+    while (t.isAlive()) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        LOG.warn(t.getName(), e);
+      }
+    }
+  }
+}