You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:17:27 UTC

svn commit: r1181534 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/compactions/ main/java/org/apache/hadoop/hbase/regionserver/metrics/ test/java/org/apache/hadoop/hbase/re...

Author: nspiegelberg
Date: Tue Oct 11 02:17:27 2011
New Revision: 1181534

URL: http://svn.apache.org/viewvc?rev=1181534&view=rev
Log:
StoreFile Level Compaction Locking

Summary:
Multithreaded compactions (HBASE-1476) will solve the problem
of major compactions clogging high-priority minor compactions. However,
there is still a problem here. Since compactions are store-level, the
store undergoing major compaction will have it's storefile count
increase during the major. We really need a way to allow multiple
outstanding compactions per store. compactSelection() should
lock/reserve the files being used for compaction. This will also allow
us to know what we're going to compact when inserting into the
CompactSplitThread and make more informed priority queueing
decisions.

Test Plan:
- mvn test -Dtest=TestCompaction
 - mvn test (ongoing)

Reviewed By: kranganathan
Reviewers: kannan, kranganathan, liyintang, jgray
Commenters: kannan
CC: hbase@lists, nspiegelberg, kranganathan, kannan
Differential Revision: 238799

Removed:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/PriorityCompactionQueue.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestPriorityCompactionQueue.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1181534&r1=1181533&r2=1181534&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Tue Oct 11 02:17:27 2011
@@ -28,11 +28,14 @@ import org.apache.hadoop.hbase.RemoteExc
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.util.StringUtils;
 
 import java.io.IOException;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -50,13 +53,14 @@ class CompactSplitThread extends Thread 
   private final HRegionServer server;
   private final Configuration conf;
 
-  private final PriorityCompactionQueue compactionQueue =
-    new PriorityCompactionQueue();
+  protected final BlockingQueue<CompactionRequest> compactionQueue =
+    new PriorityBlockingQueue<CompactionRequest>();
 
   /* The default priority for user-specified compaction requests.
    * The user gets top priority unless we have blocking compactions (Pri <= 0)
    */
   public static final int PRIORITY_USER = 1;
+  public static final int NO_PRIORITY = Integer.MIN_VALUE;
 
   /** @param server */
   public CompactSplitThread(HRegionServer server) {
@@ -73,17 +77,36 @@ class CompactSplitThread extends Thread 
     while (!this.server.isStopRequested()) {
       CompactionRequest compactionRequest = null;
       HRegion r = null;
+      boolean completed = false;
       try {
         compactionRequest = compactionQueue.poll(this.frequency, TimeUnit.MILLISECONDS);
         if (compactionRequest != null) {
+          r = compactionRequest.getHRegion();
           lock.lock();
           try {
+            // look for a split first
             if(!this.server.isStopRequested()) {
-              // Don't interrupt us while we are working
-              r = compactionRequest.getHRegion();
-              byte [] midKey = r.compactStore(compactionRequest.getStore());
-              if (r.getLastCompactInfo() != null) {  // compaction aborted?
-                this.server.getMetrics().addCompaction(r.getLastCompactInfo());
+              // don't split regions that are blocking
+              if (r.getCompactPriority() >= PRIORITY_USER) {
+                byte[] midkey = compactionRequest.getStore().checkSplit();
+                if (midkey != null) {
+                  split(r, midkey);
+                  continue;
+                }
+              }
+            }
+
+            // now test for compaction
+            if (!this.server.isStopRequested()) {
+              long startTime = EnvironmentEdgeManager.currentTimeMillis();
+              completed = r.compact(compactionRequest);
+              long now = EnvironmentEdgeManager.currentTimeMillis();
+              LOG.info(((completed) ? "completed" : "aborted")
+                  + " compaction: " + compactionRequest + ", duration="
+                  + StringUtils.formatTimeDiff(now, startTime));
+              if (completed) { // compaction aborted?
+                this.server.getMetrics().
+                  addCompaction(now - startTime, compactionRequest.getSize());
               }
               if (LOG.isDebugEnabled()) {
                 CompactionRequest next = this.compactionQueue.peek();
@@ -93,25 +116,6 @@ class CompactSplitThread extends Thread 
                           ((next != null) ?
                               ", topPri=" + next.getPriority() : ""));
               }
-              if (!this.server.isStopRequested()) {
-                // requests that were added during compaction will have a
-                // stale priority. remove and re-insert to update priority
-                boolean hadCompaction = compactionQueue.remove(compactionRequest);
-                if (midKey != null) {
-                  split(r, midKey);
-                } else if (hadCompaction) {
-                  // recompute the priority for a request already in the queue
-                  LOG.debug("Re-computing priority for compaction request " + compactionRequest);
-                  compactionRequest.setPriority(compactionRequest.getStore().getCompactPriority());
-                  compactionQueue.add(compactionRequest);
-                } else if (compactionRequest.getStore().getCompactPriority() < PRIORITY_USER) {
-                  // degenerate case. recursively enqueue blocked regions
-                  LOG.debug("Re-queueing with " + compactionRequest.getStore().getCompactPriority() +
-                      " priority for compaction request " + compactionRequest);
-                  compactionRequest.setPriority(compactionRequest.getStore().getCompactPriority());
-                  compactionQueue.add(compactionRequest);
-                }
-              }
             }
           } finally {
             lock.unlock();
@@ -120,19 +124,26 @@ class CompactSplitThread extends Thread 
       } catch (InterruptedException ex) {
         continue;
       } catch (IOException ex) {
-        LOG.error("Compaction/Split failed for region " +
-            r.getRegionNameAsString(),
+        LOG.error("Compaction/Split failed " + compactionRequest,
           RemoteExceptionHandler.checkIOException(ex));
         if (!server.checkFileSystem()) {
           break;
         }
       } catch (Exception ex) {
-        LOG.error("Compaction failed" +
-            (r != null ? (" for region " + r.getRegionNameAsString()) : ""),
-            ex);
+        LOG.error("Compaction failed " + compactionRequest, ex);
         if (!server.checkFileSystem()) {
           break;
         }
+      } finally {
+        if (compactionRequest != null) {
+          Store s = compactionRequest.getStore();
+          s.finishRequest(compactionRequest);
+          // degenerate case: blocked regions require recursive enqueues
+          if (s.getCompactPriority() < PRIORITY_USER && completed) {
+            requestCompaction(r, s, "Recursive enqueue");
+          }
+        }
+        compactionRequest = null;
       }
     }
     compactionQueue.clear();
@@ -146,19 +157,19 @@ class CompactSplitThread extends Thread 
   public synchronized void requestCompaction(final HRegion r,
       final String why) {
     for(Store s : r.getStores().values()) {
-      requestCompaction(r, s, false, why, s.getCompactPriority());
+      requestCompaction(r, s, why, NO_PRIORITY);
     }
   }
 
-  public synchronized void requestCompaction(final HRegion r,
-      final String why, int p) {
-    requestCompaction(r, false, why, p);
+  public synchronized void requestCompaction(final HRegion r, final Store s,
+      final String why) {
+    requestCompaction(r, s, why, NO_PRIORITY);
   }
 
-  public synchronized void requestCompaction(final HRegion r,
-      final boolean force, final String why, int p) {
+  public synchronized void requestCompaction(final HRegion r, final String why,
+      int p) {
     for(Store s : r.getStores().values()) {
-      requestCompaction(r, s, force, why, p);
+      requestCompaction(r, s, why, p);
     }
   }
 
@@ -166,9 +177,10 @@ class CompactSplitThread extends Thread 
    * @param r HRegion store belongs to
    * @param force Whether next compaction should be major
    * @param why Why compaction requested -- used in debug messages
+   * @param priority override the default priority (NO_PRIORITY == decide)
    */
   public synchronized void requestCompaction(final HRegion r, final Store s,
-      final boolean force, final String why, int priority) {
+      final String why, int priority) {
 
     boolean addedToQueue = false;
 
@@ -176,20 +188,21 @@ class CompactSplitThread extends Thread 
       return;
     }
 
-    // tell the region to major-compact (and don't downgrade it)
-    if (force) {
-      s.setForceMajorCompaction(force);
-    }
-    CompactionRequest compactionRequest = new CompactionRequest(r, s, priority);
-    addedToQueue = compactionQueue.add(compactionRequest);
-    // only log if actually added to compaction queue...
-    if (addedToQueue && LOG.isDebugEnabled()) {
-      LOG.debug("Compaction " + (force? "(major) ": "") +
-        "requested for region " + r.getRegionNameAsString() +
-        "/" + r.getRegionInfo().getEncodedName() +
-        ", store " + s +
-        (why != null && !why.isEmpty()? " because: " + why: "") +
-        "; Priority: " + priority + "; Compaction queue size: " + compactionQueue.size());
+    CompactionRequest cr = s.requestCompaction();
+    if (cr != null) {
+      if (priority != NO_PRIORITY) {
+        cr.setPriority(priority);
+      }
+      addedToQueue = compactionQueue.add(cr);
+      if (!addedToQueue) {
+        LOG.error("Could not add request to compaction queue: " + cr);
+        s.finishRequest(cr);
+      } else if (LOG.isDebugEnabled()) {
+        LOG.debug("Compaction requested: " + cr
+            + (why != null && !why.isEmpty() ? "; Because: " + why : "")
+            + "; Priority: " + priority + "; Compaction queue size: "
+            + compactionQueue.size());
+      }
     }
   }
 

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1181534&r1=1181533&r2=1181534&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 11 02:17:27 2011
@@ -46,6 +46,8 @@ import java.util.concurrent.atomic.Atomi
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
+import javax.print.attribute.standard.Finishings;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -76,6 +78,7 @@ import org.apache.hadoop.hbase.io.HeapSi
 import org.apache.hadoop.hbase.io.Reference.Range;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.ipc.HRegionInterface;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -89,6 +92,7 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
@@ -188,8 +192,8 @@ public class HRegion implements HeapSize
     volatile boolean flushing = false;
     // Set when a flush has been requested.
     volatile boolean flushRequested = false;
-    // Set while a compaction is running.
-    volatile boolean compacting = false;
+    // Number of compactions running.
+    volatile int compacting = 0;
     // Gets set in close. If set, cannot compact or flush again.
     volatile boolean writesEnabled = true;
     // Set if region is read-only
@@ -419,7 +423,7 @@ public class HRegion implements HeapSize
       this.writestate.setReadOnly(true);
     }
 
-    this.writestate.compacting = false;
+    this.writestate.compacting = 0;
     this.lastFlushTime = EnvironmentEdgeManager.currentTimeMillis();
     // Use maximum of log sequenceid or that which was found in stores
     // (particularly if no recovered edits, seqid will be -1).
@@ -577,12 +581,10 @@ public class HRegion implements HeapSize
         writestate.writesEnabled = false;
         wasFlushing = writestate.flushing;
         LOG.debug("Closing " + this + ": disabling compactions & flushes");
-        while (writestate.compacting || writestate.flushing) {
-          LOG.debug("waiting for" +
-              (writestate.compacting ? " compaction" : "") +
-              (writestate.flushing ?
-                  (writestate.compacting ? "," : "") + " cache flush" :
-                    "") + " to complete for region " + this);
+        while (writestate.compacting > 0 || writestate.flushing) {
+          LOG.debug("waiting for " + writestate.compacting + " compactions" +
+              (writestate.flushing ? " & cache flush" : "") +
+              " to complete for region " + this);
           try {
             writestate.wait();
           } catch (InterruptedException iex) {
@@ -693,11 +695,6 @@ public class HRegion implements HeapSize
     return this.fs;
   }
 
-  /** @return how info about the last compaction <time, size> */
-  public Pair<Long, Long> getLastCompactInfo() {
-    return this.lastCompactInfo;
-  }
-
   /** @return the last time the region was flushed */
   public long getLastFlushTime() {
     return this.lastFlushTime;
@@ -863,9 +860,9 @@ public class HRegion implements HeapSize
     return new Path(getRegionDir(), ".tmp");
   }
 
-  void setForceMajorCompaction(final boolean b) {
+  void triggerMajorCompaction() {
     for (Store h: stores.values()) {
-      h.setForceMajorCompaction(b);
+      h.triggerMajorCompaction();
     }
   }
 
@@ -886,7 +883,9 @@ public class HRegion implements HeapSize
    */
   byte [] compactStores(final boolean majorCompaction)
   throws IOException {
-    this.setForceMajorCompaction(majorCompaction);
+    if (majorCompaction) {
+      this.triggerMajorCompaction();
+    }
     return compactStores();
   }
 
@@ -895,16 +894,24 @@ public class HRegion implements HeapSize
    * to be split.
    */
   public byte[] compactStores() throws IOException {
-    byte[] splitRow = null;
     for(Store s : getStores().values()) {
-      if(splitRow == null) {
-        splitRow = compactStore(s);
+      CompactionRequest cr = s.requestCompaction();
+      if(cr != null) {
+        try {
+          compact(cr);
+        } finally {
+          s.finishRequest(cr);
+        }
+      }
+      byte[] splitRow = s.checkSplit();
+      if (splitRow != null) {
+        return splitRow;
       }
     }
-    return splitRow;
+    return null;
   }
 
-  /*
+  /**
    * Called by compaction thread and after region is opened to compact the
    * HStores if necessary.
    *
@@ -915,69 +922,52 @@ public class HRegion implements HeapSize
    * conflicts with a region split, and that cannot happen because the region
    * server does them sequentially and not in parallel.
    *
-   * @param majorCompaction True to force a major compaction regardless of thresholds
-   * @return split row if split is needed
+   * @param cr Compaction details, obtained by requestCompaction()
+   * @return whether the compaction completed
    * @throws IOException e
    */
-  public byte [] compactStore(Store store)
+  public boolean compact(CompactionRequest cr)
   throws IOException {
+    if (cr == null) {
+      return false;
+    }
     if (this.closing.get() || this.closed.get()) {
       LOG.debug("Skipping compaction on " + this + " because closing/closed");
-      return null;
+      return false;
     }
+    Preconditions.checkArgument(cr.getHRegion().toString() == this.toString());
     splitsAndClosesLock.readLock().lock();
-    this.lastCompactInfo = null;
     try {
-      byte [] splitRow = null;
       if (this.closed.get()) {
-        return splitRow;
+        return false;
       }
       try {
         synchronized (writestate) {
-          if (!writestate.compacting && writestate.writesEnabled) {
-            writestate.compacting = true;
+          if (writestate.writesEnabled) {
+            ++writestate.compacting;
           } else {
-            LOG.info("NOT compacting region " + this +
-                ": compacting=" + writestate.compacting + ", writesEnabled=" +
-                writestate.writesEnabled);
-              return splitRow;
+            LOG.info("NOT compacting region " + this + ". Writes disabled.");
+            return false;
           }
         }
         LOG.info("Starting compaction on region " + this);
-        long startTime = EnvironmentEdgeManager.currentTimeMillis();
         doRegionCompactionPrep();
-        long lastCompactSize = 0;
         boolean completed = false;
         try {
-          final Store.StoreSize ss = store.compact();
-          lastCompactSize += store.getLastCompactSize();
-          if (ss != null) {
-            splitRow = ss.getSplitRow();
-          }
+          cr.getStore().compact(cr);
           completed = true;
         } catch (InterruptedIOException iioe) {
           LOG.info("compaction interrupted by user: ", iioe);
-        } finally {
-          long now = EnvironmentEdgeManager.currentTimeMillis();
-          LOG.info(((completed) ? "completed" : "aborted")
-              + " compaction on region " + this
-              + " after " + StringUtils.formatTimeDiff(now, startTime));
-          if (completed) {
-            this.lastCompactInfo =
-              new Pair<Long,Long>((now - startTime) / 1000, lastCompactSize);
-          }
         }
+        return completed;
       } finally {
         synchronized (writestate) {
-          writestate.compacting = false;
-          writestate.notifyAll();
+          --writestate.compacting;
+          if (writestate.compacting <= 0) {
+            writestate.notifyAll();
+          }
         }
       }
-      if (splitRow != null) {
-        assert splitPoint == null || Bytes.equals(splitRow, splitPoint);
-        this.splitPoint = null; // clear the split point (if set)
-      }
-      return splitRow;
     } finally {
       splitsAndClosesLock.readLock().unlock();
     }
@@ -3361,10 +3351,12 @@ public class HRegion implements HeapSize
    * @param b
    * @return previous value
    */
-  public boolean shouldSplit(boolean b) {
-    boolean old = this.splitRequest;
-    this.splitRequest = b;
-    return old;
+  void triggerSplit() {
+    this.splitRequest = true;
+  }
+
+  boolean shouldSplit() {
+    return this.splitRequest;
   }
 
   byte[] getSplitPoint() {
@@ -3391,9 +3383,9 @@ public class HRegion implements HeapSize
    * store files
    * @return true if any store has too many store files
    */
-  public boolean hasTooManyStoreFiles() {
+  public boolean needsCompaction() {
     for(Store store : stores.values()) {
-      if(store.hasTooManyStoreFiles()) {
+      if(store.needsCompaction()) {
         return true;
       }
     }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1181534&r1=1181533&r2=1181534&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Tue Oct 11 02:17:27 2011
@@ -896,14 +896,18 @@ public class HRegionServer implements HR
       Set<Integer> keys = this.instance.onlineRegions.keySet();
       for (Integer i: keys) {
         HRegion r = this.instance.onlineRegions.get(i);
-        try {
-          if (r != null && r.isMajorCompaction()) {
-            // Queue a compaction.  Will recognize if major is needed.
-            this.instance.compactSplitThread.
-              requestCompaction(r, getName() + " requests major compaction");
+        if (r == null)
+          continue;
+        for (Store s : r.getStores().values()) {
+          try {
+            if (s.isMajorCompaction()) {
+              // Queue a compaction. Will recognize if major is needed.
+              this.instance.compactSplitThread.requestCompaction(r, s,
+                  getName() + " requests major compaction");
+            }
+          } catch (IOException e) {
+            LOG.warn("Failed major compaction check on " + r, e);
           }
-        } catch (IOException e) {
-          LOG.warn("Failed major compaction check on " + r, e);
         }
       }
     }
@@ -1359,11 +1363,13 @@ public class HRegionServer implements HR
             case MSG_REGION_SPLIT:
               region = getRegion(info.getRegionName());
               region.flushcache();
-              region.shouldSplit(true);
+              region.triggerSplit();
               region.setSplitPoint(info.getSplitPoint());
               // force a compaction; split will be side-effect.
+              // TODO: remove this. no correlation between compaction & split
+              // other than (1) references & (2) CompactSplitThread couples them
               compactSplitThread.requestCompaction(region,
-                false, e.msg.getType().name(),
+                e.msg.getType().name(),
                 CompactSplitThread.PRIORITY_USER);
               break;
 
@@ -1371,8 +1377,10 @@ public class HRegionServer implements HR
             case MSG_REGION_COMPACT:
               // Compact a region
               region = getRegion(info.getRegionName());
+              if (e.msg.isType(Type.MSG_REGION_MAJOR_COMPACT)) {
+                region.triggerMajorCompaction();
+              }
               compactSplitThread.requestCompaction(region,
-                e.msg.isType(Type.MSG_REGION_MAJOR_COMPACT),
                 e.msg.getType().name(),
                 CompactSplitThread.PRIORITY_USER);
               break;
@@ -1439,12 +1447,13 @@ public class HRegionServer implements HR
       try {
         zkUpdater.startRegionOpenEvent(null, true);
         region = instantiateRegion(regionInfo, this.hlog);
-        // Startup a compaction early if one is needed, if region has references
-        // or if a store has too many store files
-        if (region.hasReferences() || region.hasTooManyStoreFiles()) {
-          this.compactSplitThread.requestCompaction(region,
-            region.hasReferences() ? "Region has references on open" :
-                                     "Region has too many store files");
+        // Startup a compaction early if one is needed, if store has references
+        // or has too many store files
+        for (Store s : region.getStores().values()) {
+          if (s.hasReferences() || s.needsCompaction()) {
+            this.compactSplitThread.requestCompaction(region, s,
+                "Opening Region");
+          }
         }
       } catch (Throwable e) {
         Throwable t = cleanup(e,

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1181534&r1=1181533&r2=1181534&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Oct 11 02:17:27 2011
@@ -24,8 +24,10 @@ import java.io.InterruptedIOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.NavigableSet;
+import java.util.Set;
 import java.util.SortedSet;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -47,14 +49,18 @@ import org.apache.hadoop.hbase.io.HeapSi
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.ClassSize;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
 
 /**
  * A Store holds a column family in a Region.  Its a memstore and a set of zero
@@ -99,7 +105,7 @@ public class Store implements HeapSize {
   // With float, java will downcast your long to float for comparisons (bad)
   private double compactRatio;
   private long lastCompactSize = 0;
-  private volatile boolean forceMajor = false;
+  volatile boolean forceMajor = false;
   /* how many bytes to write between status checks */
   static int closeCheckInterval = 0;
   private final long desiredMaxFileSize;
@@ -116,12 +122,12 @@ public class Store implements HeapSize {
    */
   private ImmutableList<StoreFile> storefiles = null;
 
+  List<StoreFile> filesCompacting = Lists.newArrayList();
 
   // All access must be synchronized.
   private final CopyOnWriteArraySet<ChangedReadersObserver> changedReaderObservers =
     new CopyOnWriteArraySet<ChangedReadersObserver>();
 
-  private final Object compactLock = new Object();
   private final int blocksize;
   private final boolean blockcache;
   private final Compression.Algorithm compression;
@@ -537,7 +543,7 @@ public class Store implements HeapSize {
       // Tell listeners of the change in readers.
       notifyChangedReadersObservers();
 
-      return this.storefiles.size() >= this.minFilesToCompact;
+      return needsCompaction();
     } finally {
       this.lock.writeLock().unlock();
     }
@@ -583,106 +589,106 @@ public class Store implements HeapSize {
    * Existing StoreFiles are not destroyed until the new compacted StoreFile is
    * completely written-out to disk.
    *
-   * <p>The compactLock prevents multiple simultaneous compactions.
-   * The structureLock prevents us from interfering with other write operations.
-   *
-   * <p>We don't want to hold the structureLock for the whole time, as a compact()
-   * can be lengthy and we want to allow cache-flushes during this period.
-   *
-   * @return row to split around if a split is needed, null otherwise
+   * @param CompactionRequest
+   *          compaction details obtained from requestCompaction()
    * @throws IOException
    */
-  StoreSize compact() throws IOException {
-    boolean forceSplit = this.region.shouldSplit(false);
-    synchronized (compactLock) {
-      this.lastCompactSize = 0; // reset first in case compaction is aborted
+  void compact(CompactionRequest cr) throws IOException {
+    if (cr == null || cr.getFiles().isEmpty()) {
+      return;
+    }
+    Preconditions.checkArgument(cr.getStore().toString() == this.toString());
 
-      // sanity checks
-      for (StoreFile sf : this.storefiles) {
-        if (sf.getPath() == null || sf.getReader() == null) {
-          boolean np = sf.getPath() == null;
-          LOG.debug("StoreFile " + sf + " has null " + (np ? "Path":"Reader"));
-          return null;
-        }
-      }
+    List<StoreFile> filesToCompact = cr.getFiles();
 
-      // if the user wants to force a split, skip compaction unless necessary
-      boolean references = hasReferences(this.storefiles);
-      if (forceSplit && !this.forceMajor && !references) {
-        return checkSplit(forceSplit);
-      }
-
-      Collection<StoreFile> filesToCompact
-        = compactSelection(this.storefiles, this.forceMajor);
-
-      // empty == do not compact
-      if (filesToCompact.isEmpty()) {
-        // but do see if we need to split before returning
-        return checkSplit(forceSplit);
-      }
-
-      // sum size of all files included in compaction
-      long totalSize = 0;
-      for (StoreFile sf : filesToCompact) {
-        totalSize += sf.getReader().length();
-      }
-      this.lastCompactSize = totalSize;
-
-      // major compaction iff all StoreFiles are included
-      boolean majorcompaction
-        = (filesToCompact.size() == this.storefiles.size());
-      if (majorcompaction) {
-        this.forceMajor = false;
-      }
-
-      // Max-sequenceID is the last key in the files we're compacting
-      long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
-
-      // Ready to go.  Have list of files to compact.
-      LOG.info("Started compaction of " + filesToCompact.size() + " file(s) in " +
-          this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() +
-        (references? ", hasReferences=true,": " ") + " into " +
-          region.getTmpDir() + ", seqid=" + maxId +
-          ", totalSize=" + StringUtils.humanReadableInt(totalSize));
-      StoreFile.Writer writer
-        = compactStores(filesToCompact, majorcompaction, maxId);
+    synchronized (filesCompacting) {
+      // sanity check: we're compacting files that this store knows about
+      // TODO: change this to LOG.error() after more debugging
+      Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
+    }
+
+    // Max-sequenceID is the last key in the files we're compacting
+    long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
+
+    // Ready to go. Have list of files to compact.
+    LOG.info("Starting compaction of " + filesToCompact.size() + " file(s) in "
+        + this.storeNameStr + " of "
+        + this.region.getRegionInfo().getRegionNameAsString()
+        + " into " + region.getTmpDir() + ", seqid=" + maxId + ", totalSize="
+        + StringUtils.humanReadableInt(cr.getSize()));
+
+    StoreFile sf = null;
+    try {
+      StoreFile.Writer writer = compactStores(filesToCompact, cr.isMajor(), maxId);
       // Move the compaction into place.
-      StoreFile sf = completeCompaction(filesToCompact, writer);
-      if (LOG.isInfoEnabled()) {
-        LOG.info("Completed" + (majorcompaction? " major ": " ") +
-          "compaction of " + filesToCompact.size() + " file(s) in " +
-          this.storeNameStr + " of " + this.region.getRegionInfo().getRegionNameAsString() +
-          "; new storefile name=" + (sf == null? "none": sf.toString()) +
-          ", size=" + (sf == null? "none": StringUtils.humanReadableInt(sf.getReader().length())) +
-          "; total size for store is " + StringUtils.humanReadableInt(storeSize));
+      sf = completeCompaction(filesToCompact, writer);
+    } finally {
+      synchronized (filesCompacting) {
+        filesCompacting.removeAll(filesToCompact);
       }
     }
-    return checkSplit(forceSplit);
+
+    LOG.info("Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
+        + filesToCompact.size() + " file(s) in " + this.storeNameStr + " of "
+        + this.region.getRegionInfo().getRegionNameAsString()
+        + "; new storefile name=" + (sf == null ? "none" : sf.toString())
+        + ", size=" + (sf == null ? "none" :
+          StringUtils.humanReadableInt(sf.getReader().length()))
+        + "; total size for store is "
+        + StringUtils.humanReadableInt(storeSize));
   }
 
   /*
    * Compact the most recent N files. Essentially a hook for testing.
    */
   protected void compactRecent(int N) throws IOException {
-    synchronized(compactLock) {
-      List<StoreFile> filesToCompact = this.storefiles;
-      int count = filesToCompact.size();
-      if (N > count) {
-        throw new RuntimeException("Not enough files");
+    List<StoreFile> filesToCompact;
+    long maxId;
+    boolean isMajor;
+
+    this.lock.readLock().lock();
+    try {
+      synchronized (filesCompacting) {
+        filesToCompact = Lists.newArrayList(storefiles);
+        if (!filesCompacting.isEmpty()) {
+          // exclude all files older than the newest file we're currently
+          // compacting. this allows us to preserve contiguity (HBASE-2856)
+          StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
+          int idx = filesToCompact.indexOf(last);
+          Preconditions.checkArgument(idx != -1);
+          filesToCompact = filesToCompact.subList(idx+1, filesToCompact.size());
+        }
+        int count = filesToCompact.size();
+        if (N > count) {
+          throw new RuntimeException("Not enough files");
+        }
+
+        filesToCompact = filesToCompact.subList(count - N, count);
+        maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
+        isMajor = (filesToCompact.size() == storefiles.size());
+        filesCompacting.addAll(filesToCompact);
+        Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
       }
+    } finally {
+      this.lock.readLock().unlock();
+    }
 
-      filesToCompact = new ArrayList<StoreFile>(filesToCompact.subList(count-N, count));
-      long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact);
-      boolean majorcompaction = (N == count);
-
-      // Ready to go.  Have list of files to compact.
-      StoreFile.Writer writer
-        = compactStores(filesToCompact, majorcompaction, maxId);
+    try {
+      // Ready to go. Have list of files to compact.
+      StoreFile.Writer writer = compactStores(filesToCompact, isMajor, maxId);
       // Move the compaction into place.
       StoreFile sf = completeCompaction(filesToCompact, writer);
+    } finally {
+      synchronized (filesCompacting) {
+        filesCompacting.removeAll(filesToCompact);
+      }
     }
   }
 
+  boolean hasReferences() {
+    return hasReferences(this.storefiles);
+  }
+
   /*
    * @param files
    * @return True if any of the files in <code>files</code> are References.
@@ -794,6 +800,69 @@ public class Store implements HeapSize {
     return ret;
   }
 
+  public CompactionRequest requestCompaction() {
+    // don't even select for compaction if writes are disabled
+    if (!this.region.areWritesEnabled()) {
+      return null;
+    }
+
+    CompactionRequest ret = null;
+    this.lock.readLock().lock();
+    try {
+      synchronized (filesCompacting) {
+        // candidates = all storefiles not already in compaction queue
+        List<StoreFile> candidates = Lists.newArrayList(storefiles);
+        if (!filesCompacting.isEmpty()) {
+          // exclude all files older than the newest file we're currently
+          // compacting. this allows us to preserve contiguity (HBASE-2856)
+          StoreFile last = filesCompacting.get(filesCompacting.size() - 1);
+          int idx = candidates.indexOf(last);
+          Preconditions.checkArgument(idx != -1);
+          candidates = candidates.subList(idx + 1, candidates.size());
+        }
+        List<StoreFile> filesToCompact = compactSelection(candidates);
+
+        // no files to compact
+        if (filesToCompact.isEmpty()) {
+          return null;
+        }
+
+        // basic sanity check: do not try to compact the same StoreFile twice.
+        if (!Collections.disjoint(filesCompacting, filesToCompact)) {
+          // TODO: change this from an IAE to LOG.error after sufficient testing
+          Preconditions.checkArgument(false, "%s overlaps with %s",
+              filesToCompact, filesCompacting);
+        }
+        filesCompacting.addAll(filesToCompact);
+        Collections.sort(filesCompacting, StoreFile.Comparators.FLUSH_TIME);
+
+        // major compaction iff all StoreFiles are included
+        boolean isMajor = (filesToCompact.size() == this.storefiles.size());
+        if (isMajor) {
+          // since we're enqueuing a major, update the compaction wait interval
+          this.forceMajor = false;
+          this.majorCompactionTime = getNextMajorCompactTime();
+        }
+
+        // everything went better than expected. create a compaction request
+        int pri = getCompactPriority();
+        ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
+      }
+    } catch (IOException ex) {
+      LOG.error("Compaction Request failed for region " + region + ", store "
+          + this, RemoteExceptionHandler.checkIOException(ex));
+    } finally {
+      this.lock.readLock().unlock();
+    }
+    return ret;
+  }
+
+  public void finishRequest(CompactionRequest cr) {
+    synchronized (filesCompacting) {
+      filesCompacting.removeAll(cr.getFiles());
+    }
+  }
+
   /**
    * Algorithm to choose which files to compact
    *
@@ -810,12 +879,13 @@ public class Store implements HeapSize {
    *    max files to compact at once (avoids OOM)
    *
    * @param candidates candidate files, ordered from oldest to newest
-   * @param majorcompaction whether to force a major compaction
    * @return subset copy of candidate list that meets compaction criteria
    * @throws IOException
    */
-  List<StoreFile> compactSelection(List<StoreFile> candidates,
-      boolean forcemajor) throws IOException {
+  List<StoreFile> compactSelection(List<StoreFile> candidates)
+      throws IOException {
+    // ASSUMPTION!!! filesCompacting is locked when calling this function
+
     /* normal skew:
      *
      *         older ----> newer
@@ -829,6 +899,7 @@ public class Store implements HeapSize {
      */
     List<StoreFile> filesToCompact = new ArrayList<StoreFile>(candidates);
 
+    boolean forcemajor = this.forceMajor && filesCompacting.isEmpty();
     if (!forcemajor) {
       // do not compact old files above a configurable threshold
       // save all references. we MUST compact them
@@ -847,9 +918,6 @@ public class Store implements HeapSize {
     // major compact on user action or age (caveat: we have too many files)
     boolean majorcompaction = (forcemajor || isMajorCompaction(filesToCompact))
       && filesToCompact.size() < this.maxFilesToCompact;
-    if (majorcompaction) {
-      this.majorCompactionTime = getNextMajorCompactTime();
-    }
 
     if (!majorcompaction && !hasReferences(filesToCompact)) {
       // we're doing a minor compaction, let's see what files are applicable
@@ -1022,9 +1090,6 @@ public class Store implements HeapSize {
   }
 
   /*
-   * It's assumed that the compactLock  will be acquired prior to calling this
-   * method!  Otherwise, it is not thread-safe!
-   *
    * <p>It works by processing a compaction that's been written to disk.
    *
    * <p>It is usually invoked at the end of a compaction, but might also be
@@ -1065,18 +1130,13 @@ public class Store implements HeapSize {
     this.lock.writeLock().lock();
     try {
       try {
-        // 2. Unloading
-        // 3. Loading the new TreeMap.
         // Change this.storefiles so it reflects new state but do not
         // delete old store files until we have sent out notification of
         // change in case old files are still being accessed by outstanding
         // scanners.
-        ArrayList<StoreFile> newStoreFiles = new ArrayList<StoreFile>();
-        for (StoreFile sf : storefiles) {
-          if (!compactedFiles.contains(sf)) {
-            newStoreFiles.add(sf);
-          }
-        }
+        ArrayList<StoreFile> newStoreFiles = Lists.newArrayList(storefiles);
+        newStoreFiles.removeAll(compactedFiles);
+        filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
 
         // If a StoreFile result, move it into place.  May be null.
         if (result != null) {
@@ -1286,13 +1346,13 @@ public class Store implements HeapSize {
   }
 
   /**
-   * Determines if HStore can be split
-   * @param force Whether to force a split or not.
-   * @return a StoreSize if store can be split, null otherwise.
+   * Determines if Store should be split
+   * @return byte[] if store should be split, null otherwise.
    */
-  StoreSize checkSplit(final boolean force) {
+  byte[] checkSplit() {
     this.lock.readLock().lock();
     try {
+      boolean force = this.region.shouldSplit();
       // sanity checks
       if (!force) {
         if (storeSize < this.desiredMaxFileSize || this.storefiles.isEmpty()) {
@@ -1336,7 +1396,7 @@ public class Store implements HeapSize {
       }
       // if the user explicit set a split point, use that
       if (this.region.getSplitPoint() != null) {
-        return new StoreSize(maxSize, this.region.getSplitPoint());
+        return this.region.getSplitPoint();
       }
       StoreFile.Reader r = largestSf.getReader();
       if (r == null) {
@@ -1363,7 +1423,7 @@ public class Store implements HeapSize {
           }
           return null;
         }
-        return new StoreSize(maxSize, mk.getRow());
+        return mk.getRow();
       }
     } catch(IOException e) {
       LOG.warn("Failed getting store size for " + this.storeNameStr, e);
@@ -1383,8 +1443,8 @@ public class Store implements HeapSize {
     return storeSize;
   }
 
-  void setForceMajorCompaction(final boolean b) {
-    this.forceMajor = b;
+  void triggerMajorCompaction() {
+    this.forceMajor = true;
   }
 
   boolean getForceMajorCompaction() {
@@ -1460,28 +1520,6 @@ public class Store implements HeapSize {
     return this.blockingStoreFileCount - this.storefiles.size();
   }
 
-  /**
-   * Datastructure that holds size and row to split a file around.
-   * TODO: Take a KeyValue rather than row.
-   */
-  static class StoreSize {
-    private final long size;
-    private final byte [] row;
-
-    StoreSize(long size, byte [] row) {
-      this.size = size;
-      this.row = row;
-    }
-    /* @return the size */
-    long getSize() {
-      return size;
-    }
-
-    byte [] getSplitRow() {
-      return this.row;
-    }
-  }
-
   HRegion getHRegion() {
     return this.region;
   }
@@ -1566,8 +1604,8 @@ public class Store implements HeapSize {
    * @return true if number of store files is greater than
    *  the number defined in minFilesToCompact
    */
-  public boolean hasTooManyStoreFiles() {
-    return this.storefiles.size() > this.minFilesToCompact;
+  public boolean needsCompaction() {
+    return (storefiles.size() - filesCompacting.size()) > minFilesToCompact;
   }
 
   public static final long FIXED_OVERHEAD = ClassSize.align(

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1181534&r1=1181533&r2=1181534&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Tue Oct 11 02:17:27 2011
@@ -1,11 +1,13 @@
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.util.Date;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
 
   /**
    * This class represents a compaction request and holds the region, priority,
@@ -15,30 +17,37 @@ import org.apache.hadoop.hbase.regionser
     static final Log LOG = LogFactory.getLog(CompactionRequest.class);
     private final HRegion r;
     private final Store s;
+    private final List<StoreFile> files;
+    private final long totalSize;
+    private final boolean isMajor;
     private int p;
     private final Date date;
 
     public CompactionRequest(HRegion r, Store s) {
-      this(r, s, s.getCompactPriority());
+      this(r, s, null, false, s.getCompactPriority());
     }
 
     public CompactionRequest(HRegion r, Store s, int p) {
-      this(r, s, p, null);
+      this(r, s, null, false, p);
     }
 
-    public CompactionRequest(HRegion r, Store s, int p, Date d) {
+    public CompactionRequest(HRegion r, Store s,
+        List<StoreFile> files, boolean isMajor, int p) {
       if (r == null) {
         throw new NullPointerException("HRegion cannot be null");
       }
 
-      if (d == null) {
-        d = new Date();
-      }
-
       this.r = r;
       this.s = s;
+      this.files = files;
+      long sz = 0;
+      for (StoreFile sf : files) {
+        sz += sf.getReader().length();
+      }
+      this.totalSize = sz;
+      this.isMajor = isMajor;
       this.p = p;
-      this.date = d;
+      this.date = new Date();
     }
 
     /**
@@ -70,8 +79,8 @@ import org.apache.hadoop.hbase.regionser
         return compareVal;
       }
 
-      //break the tie arbitrarily
-      return -1;
+      // break the tie based on hash code
+      return this.hashCode() - request.hashCode();
     }
 
     /** Gets the HRegion for the request */
@@ -84,6 +93,20 @@ import org.apache.hadoop.hbase.regionser
       return s;
     }
 
+    /** Gets the StoreFiles for the request */
+    public List<StoreFile> getFiles() {
+      return files;
+    }
+
+    /** Gets the total size of all StoreFiles in compaction */
+    public long getSize() {
+      return totalSize;
+    }
+
+    public boolean isMajor() {
+      return this.isMajor;
+    }
+
     /** Gets the priority for the request */
     public int getPriority() {
       return p;
@@ -96,7 +119,8 @@ import org.apache.hadoop.hbase.regionser
 
     public String toString() {
       return "regionName=" + r.getRegionNameAsString() +
-        "storeName = " + new String(s.getFamily().getName()) +
+        ", storeName=" + new String(s.getFamily().getName()) +
+        ", fileCount=" + files.size() +
         ", priority=" + p + ", date=" + date;
     }
   }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java?rev=1181534&r1=1181533&r2=1181534&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/metrics/RegionServerMetrics.java Tue Oct 11 02:17:27 2011
@@ -329,11 +329,12 @@ public class RegionServerMetrics impleme
   }
 
   /**
-   * @param compact history in <time, size>
+   * @param time time that compaction took
+   * @param size bytesize of storefiles in the compaction
    */
-  public synchronized void addCompaction(final Pair<Long,Long> compact) {
-    this.compactionTime.inc(compact.getFirst());
-    this.compactionSize.inc(compact.getSecond());
+  public synchronized void addCompaction(long time, long size) {
+    this.compactionTime.inc(time);
+    this.compactionSize.inc(size);
   }
 
   /**

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java?rev=1181534&r1=1181533&r2=1181534&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompactSelection.java Tue Oct 11 02:17:27 2011
@@ -158,7 +158,9 @@ public class TestCompactSelection extend
   void compactEquals(List<StoreFile> candidates, boolean forcemajor,
       long ... expected)
   throws IOException {
-    List<StoreFile> actual = store.compactSelection(candidates, forcemajor);
+    store.forceMajor = forcemajor;
+    List<StoreFile> actual = store.compactSelection(candidates);
+    store.forceMajor = false;
     assertEquals(Arrays.toString(expected), Arrays.toString(getSizes(actual)));
   }
 
@@ -187,7 +189,7 @@ public class TestCompactSelection extend
      */
     // don't exceed max file compact threshold
     assertEquals(maxFiles,
-        store.compactSelection(sfCreate(7,6,5,4,3,2,1), false).size());
+        store.compactSelection(sfCreate(7,6,5,4,3,2,1)).size());
 
     /* MAJOR COMPACTION */
     // if a major compaction has been forced, then compact everything
@@ -197,8 +199,10 @@ public class TestCompactSelection extend
     // even if one of those files is too big
     compactEquals(sfCreate(tooBig, 12,12), true, tooBig, 12, 12);
     // don't exceed max file compact threshold, even with major compaction
+    store.forceMajor = true;
     assertEquals(maxFiles,
-        store.compactSelection(sfCreate(7,6,5,4,3,2,1), true).size());
+        store.compactSelection(sfCreate(7,6,5,4,3,2,1)).size());
+    store.forceMajor = false;
     // if we exceed maxCompactSize, downgrade to minor
     // if not, it creates a 'snowball effect' when files >> maxCompactSize:
     // the last file in compaction is the aggregate of all previous compactions
@@ -217,7 +221,7 @@ public class TestCompactSelection extend
     compactEquals(sfCreate(true, tooBig, 12,12), tooBig, 12, 12);
     // reference files should obey max file compact to avoid OOM
     assertEquals(maxFiles,
-        store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1), false).size());
+        store.compactSelection(sfCreate(true, 7,6,5,4,3,2,1)).size());
 
     // empty case
     compactEquals(new ArrayList<StoreFile>() /* empty */);

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=1181534&r1=1181533&r2=1181534&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Tue Oct 11 02:17:27 2011
@@ -154,7 +154,7 @@ public class TestStore extends TestCase 
     assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);
 
     // after compact; check the lowest time stamp
-    store.compact();
+    store.compact(store.requestCompaction());
     lowestTimeStampFromStore = Store.getLowestTimestamp(store.getStorefiles());
     lowestTimeStampFromFS = getLowestTimeStampFromFS(fs,store.getStorefiles());
     assertEquals(lowestTimeStampFromStore,lowestTimeStampFromFS);