You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:17:36 UTC

svn commit: r1181535 - in /hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver: CompactSplitThread.java HRegion.java HRegionServer.java MemStoreFlusher.java SplitRequest.java Store.java compactions/CompactionRequest.java

Author: nspiegelberg
Date: Tue Oct 11 02:17:36 2011
New Revision: 1181535

URL: http://svn.apache.org/viewvc?rev=1181535&view=rev
Log:
Multithreaded Compactions

Summary:
we're seeing that when processing any major
compactions takes much longer than flushes, our StoreFile
size gets very large while processing any majors. The immediate fix is to
rolling split the regions, however a long-term fix is to
have multiple compaction threads to keep storefiles low when one thread
needs to process a major.

Test Plan:
- mvn test -Dtest=TestCompactSelection

Reviewed By: kannan
Reviewers: jgray, dhruba, aaiyer, kannan
Commenters: jgray
CC: hbase@lists, jgray, nspiegelberg, kannan
Differential Revision: 225879

Added:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1181535&r1=1181534&r2=1181535&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Tue Oct 11 02:17:36 2011
@@ -22,39 +22,30 @@ package org.apache.hadoop.hbase.regionse
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Writables;
-import org.apache.hadoop.util.StringUtils;
 
-import java.io.IOException;
-import java.util.concurrent.BlockingQueue;
+import com.google.common.base.Preconditions;
+
+import java.util.concurrent.Executors;
 import java.util.concurrent.PriorityBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Compact region on request and then run split if appropriate
  */
-class CompactSplitThread extends Thread {
+public class CompactSplitThread {
   static final Log LOG = LogFactory.getLog(CompactSplitThread.class);
 
-  private HTable root = null;
-  private HTable meta = null;
-  private final long frequency;
-  private final ReentrantLock lock = new ReentrantLock();
-
   private final HRegionServer server;
   private final Configuration conf;
 
-  protected final BlockingQueue<CompactionRequest> compactionQueue =
-    new PriorityBlockingQueue<CompactionRequest>();
+  private final ThreadPoolExecutor largeCompactions;
+  private final ThreadPoolExecutor smallCompactions;
+  private final ThreadPoolExecutor splits;
+  private final long throttleSize;
 
   /* The default priority for user-specified compaction requests.
    * The user gets top priority unless we have blocking compactions (Pri <= 0)
@@ -63,91 +54,66 @@ class CompactSplitThread extends Thread 
   public static final int NO_PRIORITY = Integer.MIN_VALUE;
 
   /** @param server */
-  public CompactSplitThread(HRegionServer server) {
+  CompactSplitThread(HRegionServer server) {
     super();
     this.server = server;
     this.conf = server.conf;
-    this.frequency =
-      conf.getLong("hbase.regionserver.thread.splitcompactcheckfrequency",
-      20 * 1000);
+
+    int largeThreads = Math.max(1, conf.getInt(
+        "hbase.regionserver.thread.compaction.large", 1));
+    int smallThreads = conf.getInt(
+        "hbase.regionserver.thread.compaction.small", 0);
+    throttleSize = conf.getLong(
+        "hbase.regionserver.thread.compaction.throttle", 0);
+    int splitThreads = conf.getInt("hbase.regionserver.thread.split", 1);
+
+    // if we have throttle threads, make sure the user also specified size
+    Preconditions.checkArgument(smallThreads == 0 || throttleSize > 0);
+
+    this.largeCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
+        60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
+    this.largeCompactions
+        .setRejectedExecutionHandler(new CompactionRequest.Rejection());
+    this.smallCompactions = (smallThreads <= 0) ? null
+        : new ThreadPoolExecutor(smallThreads, smallThreads, 60,
+          TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>());
+    this.smallCompactions
+        .setRejectedExecutionHandler(new CompactionRequest.Rejection());
+    this.splits = (ThreadPoolExecutor) Executors
+        .newFixedThreadPool(splitThreads);
   }
 
   @Override
-  public void run() {
-    while (!this.server.isStopRequested()) {
-      CompactionRequest compactionRequest = null;
-      HRegion r = null;
-      boolean completed = false;
-      try {
-        compactionRequest = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
-        if (compactionRequest != null) {
-          r = compactionRequest.getHRegion();
-          lock.lock();
-          try {
-            // look for a split first
-            if(!this.server.isStopRequested()) {
-              // don't split regions that are blocking
-              if (r.getCompactPriority() >= PRIORITY_USER) {
-                byte[] midkey = compactionRequest.getStore().checkSplit();
-                if (midkey != null) {
-                  split(r, midkey);
-                  continue;
-                }
-              }
-            }
-
-            // now test for compaction
-            if (!this.server.isStopRequested()) {
-              long startTime = EnvironmentEdgeManager.currentTimeMillis();
-              completed = r.compact(compactionRequest);
-              long now = EnvironmentEdgeManager.currentTimeMillis();
-              LOG.info(((completed) ? "completed" : "aborted")
-                  + " compaction: " + compactionRequest + ", duration="
-                  + StringUtils.formatTimeDiff(now, startTime));
-              if (completed) { // compaction aborted?
-                this.server.getMetrics().
-                  addCompaction(now - startTime, compactionRequest.getSize());
-              }
-              if (LOG.isDebugEnabled()) {
-                CompactionRequest next = this.compactionQueue.peek();
-                LOG.debug("Just finished a compaction. " +
-                          " Current Compaction Queue: size=" +
-                          getCompactionQueueSize() +
-                          ((next != null) ?
-                              ", topPri=" + next.getPriority() : ""));
-              }
-            }
-          } finally {
-            lock.unlock();
-          }
-        }
-      } catch (InterruptedException ex) {
-        continue;
-      } catch (IOException ex) {
-        LOG.error("Compaction/Split failed " + compactionRequest,
-          RemoteExceptionHandler.checkIOException(ex));
-        if (!server.checkFileSystem()) {
-          break;
-        }
-      } catch (Exception ex) {
-        LOG.error("Compaction failed " + compactionRequest, ex);
-        if (!server.checkFileSystem()) {
-          break;
-        }
-      } finally {
-        if (compactionRequest != null) {
-          Store s = compactionRequest.getStore();
-          s.finishRequest(compactionRequest);
-          // degenerate case: blocked regions require recursive enqueues
-          if (s.getCompactPriority() < PRIORITY_USER && completed) {
-            requestCompaction(r, s, "Recursive enqueue");
-          }
-        }
-        compactionRequest = null;
+  public String toString() {
+    return "compaction_queue="
+        + (smallCompactions != null ? "("
+            + largeCompactions.getQueue().size() + ":"
+            + smallCompactions.getQueue().size() + ")"
+            : largeCompactions.getQueue().size())
+        + ", split_queue=" + splits.getQueue().size();
+  }
+
+  public synchronized boolean requestSplit(final HRegion r) {
+    // don't split regions that are blocking
+    if (r.getCompactPriority() >= PRIORITY_USER) {
+      byte[] midKey = r.checkSplit();
+      if (midKey != null) {
+        requestSplit(r, midKey);
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public synchronized void requestSplit(final HRegion r, byte[] midKey) {
+    try {
+      this.splits.execute(new SplitRequest(r, midKey));
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Split requested for " + r + ".  " + this);
       }
+    } catch (RejectedExecutionException ree) {
+      LOG.info("Could not execute split for " + r, ree);
     }
-    compactionQueue.clear();
-    LOG.info(getName() + " exiting");
   }
 
   /**
@@ -175,15 +141,13 @@ class CompactSplitThread extends Thread 
 
   /**
    * @param r HRegion store belongs to
-   * @param force Whether next compaction should be major
+   * @param s Store to request compaction on
    * @param why Why compaction requested -- used in debug messages
    * @param priority override the default priority (NO_PRIORITY == decide)
    */
   public synchronized void requestCompaction(final HRegion r, final Store s,
       final String why, int priority) {
 
-    boolean addedToQueue = false;
-
     if (this.server.stopRequested.get()) {
       return;
     }
@@ -193,107 +157,54 @@ class CompactSplitThread extends Thread 
       if (priority != NO_PRIORITY) {
         cr.setPriority(priority);
       }
-      addedToQueue = compactionQueue.add(cr);
-      if (!addedToQueue) {
-        LOG.error("Could not add request to compaction queue: " + cr);
-        s.finishRequest(cr);
-      } else if (LOG.isDebugEnabled()) {
-        LOG.debug("Compaction requested: " + cr
+      ThreadPoolExecutor pool = largeCompactions;
+      if (smallCompactions != null && throttleSize > cr.getSize()) {
+        // smallCompactions is like the 10 items or less line at Walmart
+        pool = smallCompactions;
+      }
+      pool.execute(cr);
+      if (LOG.isDebugEnabled()) {
+        String type = "";
+        if (smallCompactions != null) {
+          type = (pool == smallCompactions) ? "Small " : "Large ";
+        }
+        LOG.debug(type + "Compaction requested: " + cr
             + (why != null && !why.isEmpty() ? "; Because: " + why : "")
-            + "; Priority: " + priority + "; Compaction queue size: "
-            + compactionQueue.size());
-      }
-    }
-  }
-
-  private void split(final HRegion region, final byte [] midKey)
-  throws IOException {
-    final HRegionInfo oldRegionInfo = region.getRegionInfo();
-    final long startTime = System.currentTimeMillis();
-    final HRegion[] newRegions = region.splitRegion(midKey);
-    if (newRegions == null) {
-      // Didn't need to be split
-      return;
-    }
-
-    // When a region is split, the META table needs to updated if we're
-    // splitting a 'normal' region, and the ROOT table needs to be
-    // updated if we are splitting a META region.
-    HTable t = null;
-    if (region.getRegionInfo().isMetaTable()) {
-      // We need to update the root region
-      if (this.root == null) {
-        this.root = new HTable(conf, HConstants.ROOT_TABLE_NAME);
+            + "; " + this);
       }
-      t = root;
-    } else {
-      // For normal regions we need to update the meta region
-      if (meta == null) {
-        meta = new HTable(conf, HConstants.META_TABLE_NAME);
-      }
-      t = meta;
-    }
-
-    // Mark old region as offline and split in META.
-    // NOTE: there is no need for retry logic here. HTable does it for us.
-    oldRegionInfo.setOffline(true);
-    oldRegionInfo.setSplit(true);
-    // Inform the HRegionServer that the parent HRegion is no-longer online.
-    this.server.removeFromOnlineRegions(oldRegionInfo);
-
-    Put put = new Put(oldRegionInfo.getRegionName());
-    put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
-      Writables.getBytes(oldRegionInfo));
-    put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
-        HConstants.EMPTY_BYTE_ARRAY);
-    put.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
-        HConstants.EMPTY_BYTE_ARRAY);
-    put.add(HConstants.CATALOG_FAMILY, HConstants.SPLITA_QUALIFIER,
-      Writables.getBytes(newRegions[0].getRegionInfo()));
-    put.add(HConstants.CATALOG_FAMILY, HConstants.SPLITB_QUALIFIER,
-      Writables.getBytes(newRegions[1].getRegionInfo()));
-    t.put(put);
-
-    // If we crash here, then the daughters will not be added and we'll have
-    // and offlined parent but no daughters to take up the slack.  hbase-2244
-    // adds fixup to the metascanners.
-
-    // Add new regions to META
-    for (int i = 0; i < newRegions.length; i++) {
-      put = new Put(newRegions[i].getRegionName());
-      put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
-          Writables.getBytes(newRegions[i].getRegionInfo()));
-      t.put(put);
     }
-
-    // If we crash here, the master will not know of the new daughters and they
-    // will not be assigned.  The metascanner when it runs will notice and take
-    // care of assigning the new daughters.
-
-    // Now tell the master about the new regions
-    server.reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
-      newRegions[1].getRegionInfo());
-
-    LOG.info("region split, META updated, and report to master all" +
-      " successful. Old region=" + oldRegionInfo.toString() +
-      ", new regions: " + newRegions[0].toString() + ", " +
-      newRegions[1].toString() + ". Split took " +
-      StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
   }
 
   /**
    * Only interrupt once it's done with a run through the work loop.
    */
   void interruptIfNecessary() {
-    if (lock.tryLock()) {
+    splits.shutdown();
+    largeCompactions.shutdown();
+    if (smallCompactions != null)
+      smallCompactions.shutdown();
+  }
+
+  private void waitFor(ThreadPoolExecutor t, String name) {
+    boolean done = false;
+    while (!done) {
       try {
-        this.interrupt();
-      } finally {
-        lock.unlock();
+        done = t.awaitTermination(60, TimeUnit.SECONDS);
+        LOG.debug("Waiting for " + name + " to finish...");
+      } catch (InterruptedException ie) {
+        LOG.debug("Interrupted waiting for " + name + " to finish...");
       }
     }
   }
 
+  void join() {
+    waitFor(splits, "Split Thread");
+    waitFor(largeCompactions, "Large Compaction Thread");
+    if (smallCompactions != null) {
+      waitFor(smallCompactions, "Small Compaction Thread");
+    }
+  }
+
   /**
    * Returns the current size of the queue containing regions that are
    * processed.
@@ -301,6 +212,9 @@ class CompactSplitThread extends Thread 
    * @return The current size of the regions queue.
    */
   public int getCompactionQueueSize() {
-    return compactionQueue.size();
+    int size = largeCompactions.getQueue().size();
+    if (smallCompactions != null)
+      size += smallCompactions.getQueue().size();
+    return size;
   }
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1181535&r1=1181534&r2=1181535&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 11 02:17:36 2011
@@ -3367,6 +3367,20 @@ public class HRegion implements HeapSize
     this.splitPoint = sp;
   }
 
+  byte[] checkSplit() {
+    if (this.splitPoint != null) {
+      return this.splitPoint;
+    }
+    byte[] splitPoint = null;
+    for (Store s : stores.values()) {
+      splitPoint = s.checkSplit();
+      if (splitPoint != null) {
+        return splitPoint;
+      }
+    }
+    return null;
+  }
+
   /**
    * @return The priority that this region should have in the compaction queue
    */

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1181535&r1=1181534&r2=1181535&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Oct 11 02:17:36 2011
@@ -209,7 +209,7 @@ public class HRegionServer implements HR
   private RegionServerDynamicMetrics dynamicMetrics;
 
   // Compactions
-  CompactSplitThread compactSplitThread;
+  public CompactSplitThread compactSplitThread;
 
   // Cache flushing
   MemStoreFlusher cacheFlusher;
@@ -865,7 +865,7 @@ public class HRegionServer implements HR
    *
    * @return false if file system is not available
    */
-  protected boolean checkFileSystem() {
+  public boolean checkFileSystem() {
     if (this.fsOk && this.fs != null) {
       try {
         FSUtils.checkFileSystemAvailable(this.fs);
@@ -1036,8 +1036,6 @@ public class HRegionServer implements HR
         handler);
     Threads.setDaemonThreadRunning(this.cacheFlusher, n + ".cacheFlusher",
       handler);
-    Threads.setDaemonThreadRunning(this.compactSplitThread, n + ".compactor",
-        handler);
     Threads.setDaemonThreadRunning(this.workerThread, n + ".worker", handler);
     Threads.setDaemonThreadRunning(this.majorCompactionChecker,
         n + ".majorCompactionChecker", handler);
@@ -1094,7 +1092,7 @@ public class HRegionServer implements HR
       return false;
     }
     // Verify that all threads are alive
-    if (!(leases.isAlive() && compactSplitThread.isAlive() &&
+    if (!(leases.isAlive() &&
         cacheFlusher.isAlive() && hlogRoller.isAlive() &&
         workerThread.isAlive() && this.majorCompactionChecker.isAlive())) {
       // One or more threads are no longer alive - shut down
@@ -1196,8 +1194,8 @@ public class HRegionServer implements HR
     Threads.shutdown(this.majorCompactionChecker);
     Threads.shutdown(this.workerThread);
     Threads.shutdown(this.cacheFlusher);
-    Threads.shutdown(this.compactSplitThread);
     Threads.shutdown(this.hlogRoller);
+    this.compactSplitThread.join();
     this.replicationHandler.join();
   }
 
@@ -1365,12 +1363,7 @@ public class HRegionServer implements HR
               region.flushcache();
               region.triggerSplit();
               region.setSplitPoint(info.getSplitPoint());
-              // force a compaction; split will be side-effect.
-              // TODO: remove this. no correlation between compaction & split
-              // other than (1) references & (2) CompactSplitThread couples them
-              compactSplitThread.requestCompaction(region,
-                e.msg.getType().name(),
-                CompactSplitThread.PRIORITY_USER);
+              compactSplitThread.requestSplit(region, region.checkSplit());
               break;
 
             case MSG_REGION_MAJOR_COMPACT:

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1181535&r1=1181534&r2=1181535&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Tue Oct 11 02:17:36 2011
@@ -212,7 +212,9 @@ class MemStoreFlusher extends Thread imp
           LOG.warn("Region " + region.getRegionNameAsString() + " has too many " +
             "store files; delaying flush up to " + this.blockingWaitTime + "ms");
         }
-        this.server.compactSplitThread.requestCompaction(region, getName());
+        if (!this.server.compactSplitThread.requestSplit(region)) {
+          this.server.compactSplitThread.requestCompaction(region, getName());
+        }
         // Put back on the queue.  Have it come back out of the queue
         // after a delay of this.blockingWaitTime / 100 ms.
         this.flushQueue.add(fqe.requeue(this.blockingWaitTime / 100));

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java?rev=1181535&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java Tue Oct 11 02:17:36 2011
@@ -0,0 +1,121 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Writables;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Handles processing region splits. Put in a queue, owned by HRegionServer.
+ */
+class SplitRequest implements Runnable {
+  static final Log LOG = LogFactory.getLog(SplitRequest.class);
+  private final HRegion region;
+  private final byte[] midKey;
+  private static HTable root = null;
+  private static HTable meta = null;
+
+  SplitRequest(HRegion region, byte[] midKey) {
+    this.region = region;
+    this.midKey = midKey;
+  }
+
+  @Override
+  public String toString() {
+    return "regionName=" + region + ", midKey=" + Bytes.toStringBinary(midKey);
+  }
+
+  @Override
+  public void run() {
+    HRegionServer server = region.getRegionServer();
+    try {
+      Configuration conf = region.getConf();
+      final HRegionInfo oldRegionInfo = region.getRegionInfo();
+      final long startTime = System.currentTimeMillis();
+      final HRegion[] newRegions = region.splitRegion(midKey);
+      if (newRegions == null) {
+        // Didn't need to be split
+        return;
+      }
+
+      // When a region is split, the META table needs to updated if we're
+      // splitting a 'normal' region, and the ROOT table needs to be
+      // updated if we are splitting a META region.
+      HTable t = null;
+      if (region.getRegionInfo().isMetaTable()) {
+        // We need to update the root region
+        if (root == null) {
+          root = new HTable(conf, HConstants.ROOT_TABLE_NAME);
+        }
+        t = root;
+      } else {
+        // For normal regions we need to update the meta region
+        if (meta == null) {
+          meta = new HTable(conf, HConstants.META_TABLE_NAME);
+        }
+        t = meta;
+      }
+
+      // Mark old region as offline and split in META.
+      // NOTE: there is no need for retry logic here. HTable does it for us.
+      oldRegionInfo.setOffline(true);
+      oldRegionInfo.setSplit(true);
+      // Inform the HRegionServer that the parent HRegion is no-longer online.
+      server.removeFromOnlineRegions(oldRegionInfo);
+
+      Put put = new Put(oldRegionInfo.getRegionName());
+      put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
+          Writables.getBytes(oldRegionInfo));
+      put.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER,
+          HConstants.EMPTY_BYTE_ARRAY);
+      put.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER,
+          HConstants.EMPTY_BYTE_ARRAY);
+      put.add(HConstants.CATALOG_FAMILY, HConstants.SPLITA_QUALIFIER, Writables
+          .getBytes(newRegions[0].getRegionInfo()));
+      put.add(HConstants.CATALOG_FAMILY, HConstants.SPLITB_QUALIFIER, Writables
+          .getBytes(newRegions[1].getRegionInfo()));
+      t.put(put);
+
+      // If we crash here, then the daughters will not be added and we'll have
+      // and offlined parent but no daughters to take up the slack. hbase-2244
+      // adds fixup to the metascanners.
+
+      // Add new regions to META
+      for (int i = 0; i < newRegions.length; i++) {
+        put = new Put(newRegions[i].getRegionName());
+        put.add(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER,
+            Writables.getBytes(newRegions[i].getRegionInfo()));
+        t.put(put);
+      }
+
+      // If we crash here, the master will not know of the new daughters and they
+      // will not be assigned. The metascanner when it runs will notice and take
+      // care of assigning the new daughters.
+
+      // Now tell the master about the new regions
+      server.reportSplit(oldRegionInfo, newRegions[0].getRegionInfo(),
+          newRegions[1].getRegionInfo());
+
+      LOG.info("region split, META updated, and report to master all"
+          + " successful. Old region=" + oldRegionInfo.toString()
+          + ", new regions: " + newRegions[0].toString() + ", "
+          + newRegions[1].toString() + ". Split took "
+          + StringUtils.formatTimeDiff(System.currentTimeMillis(), startTime));
+      LOG.debug("Compaction/Split Status: " + server.compactSplitThread);
+    } catch (IOException ex) {
+      LOG.error("Split failed " + this, RemoteExceptionHandler
+          .checkIOException(ex));
+      server.checkFileSystem();
+    }
+  }
+
+}
\ No newline at end of file

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181535&r1=1181534&r2=1181535&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 02:17:36 2011
@@ -656,7 +656,7 @@ public class Store implements HeapSize {
           StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
           int idx = filesToCompact.indexOf(last);
           Preconditions.checkArgument(idx != -1);
-          filesToCompact = filesToCompact.subList(idx+1, filesToCompact.size());
+          filesToCompact.subList(0, idx + 1).clear();
         }
         int count = filesToCompact.size();
         if (N > count) {
@@ -818,7 +818,7 @@ public class Store implements HeapSize {
           StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
           int idx = candidates.indexOf(last);
           Preconditions.checkArgument(idx != -1);
-          candidates = candidates.subList(idx + 1, candidates.size());
+          candidates.subList(0, idx + 1).clear();
         }
         List<StoreFile> filesToCompact = compactSelection(candidates);
 
@@ -935,6 +935,11 @@ public class Store implements HeapSize {
             }));
       }
 
+      // skip selection algorithm if we don't have enough files
+      if (filesToCompact.size() < this.minFilesToCompact) {
+        return Collections.emptyList();
+      }
+
       /* TODO: add sorting + unit test back in when HBASE-2856 is fixed
       //sort files by size to correct when normal skew is altered by bulk load
       Collections.sort(filesToCompact, StoreFile.Comparators.FILE_SIZE);
@@ -1349,7 +1354,7 @@ public class Store implements HeapSize {
    * Determines if Store should be split
    * @return byte[] if store should be split, null otherwise.
    */
-  byte[] checkSplit() {
+  public byte[] checkSplit() {
     this.lock.readLock().lock();
     try {
       boolean force = this.region.shouldSplit();

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1181535&r1=1181534&r2=1181535&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Tue Oct 11 02:17:36 2011
@@ -1,19 +1,31 @@
 package org.apache.hadoop.hbase.regionserver.compactions;
 
+import java.io.IOException;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.RejectedExecutionHandler;
+import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.util.StringUtils;
 
-  /**
-   * This class represents a compaction request and holds the region, priority,
-   * and time submitted.
-   */
-  public class CompactionRequest implements Comparable<CompactionRequest> {
+import com.google.common.base.Function;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ * This class holds all details necessary to run a compaction.
+ */
+public class CompactionRequest implements Comparable<CompactionRequest>,
+    Runnable {
     static final Log LOG = LogFactory.getLog(CompactionRequest.class);
     private final HRegion r;
     private final Store s;
@@ -23,19 +35,10 @@ import org.apache.hadoop.hbase.regionser
     private int p;
     private final Date date;
 
-    public CompactionRequest(HRegion r, Store s) {
-      this(r, s, null, false, s.getCompactPriority());
-    }
-
-    public CompactionRequest(HRegion r, Store s, int p) {
-      this(r, s, null, false, p);
-    }
-
     public CompactionRequest(HRegion r, Store s,
         List<StoreFile> files, boolean isMajor, int p) {
-      if (r == null) {
-        throw new NullPointerException("HRegion cannot be null");
-      }
+      Preconditions.checkNotNull(r);
+      Preconditions.checkNotNull(files);
 
       this.r = r;
       this.s = s;
@@ -117,10 +120,68 @@ import org.apache.hadoop.hbase.regionser
       this.p = p;
     }
 
+  @Override
     public String toString() {
+      String fsList = Joiner.on(", ").join(Lists.transform(files,
+        new Function<StoreFile, String>() {
+          public String apply(StoreFile sf) {
+            return StringUtils.humanReadableInt(sf.getReader().length());
+          }
+        }));
+
       return "regionName=" + r.getRegionNameAsString() +
         ", storeName=" + new String(s.getFamily().getName()) +
         ", fileCount=" + files.size() +
+        ", fileSize=" + StringUtils.humanReadableInt(totalSize) +
+          " (" + fsList + ")" +
         ", priority=" + p + ", date=" + date;
     }
+
+    @Override
+    public void run() {
+      HRegionServer server = this.r.getRegionServer();
+      if (server.isStopRequested()) {
+        return;
+      }
+      try {
+        long startTime = EnvironmentEdgeManager.currentTimeMillis();
+        boolean completed = r.compact(this);
+        long now = EnvironmentEdgeManager.currentTimeMillis();
+        LOG.info(((completed) ? "completed" : "aborted") + " compaction: " + this
+            + ", duration=" + StringUtils.formatTimeDiff(now, startTime));
+        if (completed) {
+          server.getMetrics().addCompaction(now - startTime, this.totalSize);
+          // degenerate case: blocked regions require recursive enqueues
+          if (s.getCompactPriority() <= 0) {
+            server.compactSplitThread
+              .requestCompaction(r, s, "Recursive enqueue");
+          }
+        }
+      } catch (IOException ex) {
+        LOG.error("Compaction failed " + this, RemoteExceptionHandler
+            .checkIOException(ex));
+        server.checkFileSystem();
+      } catch (Exception ex) {
+        LOG.error("Compaction failed " + this, ex);
+        server.checkFileSystem();
+      } finally {
+        s.finishRequest(this);
+        LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
+      }
+    }
+
+    /**
+     * Cleanup class to use when rejecting a compaction request from the queue.
+     */
+    public static class Rejection implements RejectedExecutionHandler {
+
+      @Override
+      public void rejectedExecution(Runnable request, ThreadPoolExecutor pool) {
+        if (request instanceof CompactionRequest) {
+          CompactionRequest cr = (CompactionRequest) request;
+          LOG.debug("Compaction Rejected: " + cr);
+          cr.getStore().finishRequest(cr);
+        }
+      }
+    }
   }