You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/02/26 21:52:00 UTC

svn commit: r1450407 [1/2] - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/compactions/ test/java/org/apache/hadoop/hbase/...

Author: sershe
Date: Tue Feb 26 20:51:59 2013
New Revision: 1450407

URL: http://svn.apache.org/r1450407
Log:
HBASE-7843 Enable encapsulating compaction policy/compactor/store file manager interaction shenanigans

Added:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java
Removed:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/PerfTestCompactionPolicies.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java Tue Feb 26 20:51:59 2013
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Tue Feb 26 20:51:59 2013
@@ -26,6 +26,7 @@ import java.util.concurrent.BlockingQueu
 import java.util.concurrent.Executors;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -34,7 +35,12 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
 
@@ -89,8 +95,7 @@ public class CompactSplitThread implemen
             return t;
           }
       });
-    this.largeCompactions
-        .setRejectedExecutionHandler(new CompactionRequest.Rejection());
+    this.largeCompactions.setRejectedExecutionHandler(new Rejection());
     this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
         60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
         new ThreadFactory() {
@@ -102,7 +107,7 @@ public class CompactSplitThread implemen
           }
       });
     this.smallCompactions
-        .setRejectedExecutionHandler(new CompactionRequest.Rejection());
+        .setRejectedExecutionHandler(new Rejection());
     this.splits = (ThreadPoolExecutor)
         Executors.newFixedThreadPool(splitThreads,
             new ThreadFactory() {
@@ -193,7 +198,7 @@ public class CompactSplitThread implemen
 
   @Override
   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
-      List<CompactionRequest> requests) throws IOException {
+      List<Pair<CompactionRequest, Store>> requests) throws IOException {
     return requestCompaction(r, why, Store.NO_PRIORITY, requests);
   }
 
@@ -205,7 +210,7 @@ public class CompactSplitThread implemen
 
   @Override
   public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
-      int p, List<CompactionRequest> requests) throws IOException {
+      int p, List<Pair<CompactionRequest, Store>> requests) throws IOException {
     // not a special compaction request, so make our own list
     List<CompactionRequest> ret;
     if (requests == null) {
@@ -215,8 +220,8 @@ public class CompactSplitThread implemen
       }
     } else {
       ret = new ArrayList<CompactionRequest>(requests.size());
-      for (CompactionRequest request : requests) {
-        requests.add(requestCompaction(r, request.getStore(), why, p, request));
+      for (Pair<CompactionRequest, Store> pair : requests) {
+        ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
       }
     }
     return ret;
@@ -235,28 +240,29 @@ public class CompactSplitThread implemen
     if (this.server.isStopped()) {
       return null;
     }
-    CompactionRequest cr = s.requestCompaction(priority, request);
-    if (cr != null) {
-      cr.setServer(server);
-      if (priority != Store.NO_PRIORITY) {
-        cr.setPriority(priority);
-      }
-      ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())
-        ? largeCompactions : smallCompactions;
-      pool.execute(cr);
-      if (LOG.isDebugEnabled()) {
-        String type = (pool == smallCompactions) ? "Small " : "Large ";
-        LOG.debug(type + "Compaction requested: " + cr
-            + (why != null && !why.isEmpty() ? "; Because: " + why : "")
-            + "; " + this);
-      }
-    } else {
+    CompactionContext compaction = s.requestCompaction(priority, request);
+    if (compaction == null) {
       if(LOG.isDebugEnabled()) {
         LOG.debug("Not compacting " + r.getRegionNameAsString() + 
             " because compaction request was cancelled");
       }
+      return null;
+    }
+
+    assert compaction.hasSelection();
+    if (priority != Store.NO_PRIORITY) {
+      compaction.getRequest().setPriority(priority);
     }
-    return cr;
+    ThreadPoolExecutor pool = s.throttleCompaction(compaction.getRequest().getSize())
+      ? largeCompactions : smallCompactions;
+    pool.execute(new CompactionRunner(s, r, compaction));
+    if (LOG.isDebugEnabled()) {
+      String type = (pool == smallCompactions) ? "Small " : "Large ";
+      LOG.debug(type + "Compaction requested: " + compaction
+          + (why != null && !why.isEmpty() ? "; Because: " + why : "")
+          + "; " + this);
+    }
+    return compaction.getRequest();
   }
 
   /**
@@ -309,4 +315,73 @@ public class CompactSplitThread implemen
   public int getRegionSplitLimit() {
     return this.regionSplitLimit;
   }
+
+  private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
+    private final Store store;
+    private final HRegion region;
+    private final CompactionContext compaction;
+
+    public CompactionRunner(Store store, HRegion region, CompactionContext compaction) {
+      super();
+      this.store = store;
+      this.region = region;
+      this.compaction = compaction;
+    }
+
+    @Override
+    public void run() {
+      Preconditions.checkNotNull(server);
+      if (server.isStopped()) {
+        return;
+      }
+      this.compaction.getRequest().beforeExecute();
+      try {
+        // Note: please don't put single-compaction logic here;
+        //       put it into region/store/etc. This is CST logic.
+        long start = EnvironmentEdgeManager.currentTimeMillis();
+        boolean completed = region.compact(compaction, store);
+        long now = EnvironmentEdgeManager.currentTimeMillis();
+        LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
+              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
+        if (completed) {
+          // degenerate case: blocked regions require recursive enqueues
+          if (store.getCompactPriority() <= 0) {
+            requestCompaction(region, store, "Recursive enqueue", null);
+          } else {
+            // see if the compaction has caused us to exceed max region size
+            requestSplit(region);
+          }
+        }
+      } catch (IOException ex) {
+        LOG.error("Compaction failed " + this, RemoteExceptionHandler.checkIOException(ex));
+        server.checkFileSystem();
+      } catch (Exception ex) {
+        LOG.error("Compaction failed " + this, ex);
+        server.checkFileSystem();
+      } finally {
+        LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
+      }
+      this.compaction.getRequest().afterExecute();
+    }
+
+    @Override
+    public int compareTo(CompactionRunner o) {
+      // Only compare the underlying request, for queue sorting purposes.
+      return this.compaction.getRequest().compareTo(o.compaction.getRequest());
+    }
+  }
+
+  /**
+   * Cleanup class to use when rejecting a compaction request from the queue.
+   */
+  private static class Rejection implements RejectedExecutionHandler {
+    @Override
+    public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
+      if (runnable instanceof CompactionRunner) {
+        CompactionRunner runner = (CompactionRunner)runnable;
+        LOG.debug("Compaction Rejected: " + runner);
+        runner.store.cancelRequestedCompaction(runner.compaction);
+      }
+    }
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java Tue Feb 26 20:51:59 2013
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.Pair;
 
 @InterfaceAudience.Private
 public interface CompactionRequestor {
@@ -47,7 +48,7 @@ public interface CompactionRequestor {
    * @throws IOException
    */
   public List<CompactionRequest> requestCompaction(final HRegion r, final String why,
-      List<CompactionRequest> requests)
+      List<Pair<CompactionRequest, Store>> requests)
       throws IOException;
 
   /**
@@ -74,7 +75,7 @@ public interface CompactionRequestor {
    * @throws IOException
    */
   public List<CompactionRequest> requestCompaction(final HRegion r, final String why, int pri,
-      List<CompactionRequest> requests) throws IOException;
+      List<Pair<CompactionRequest, Store>> requests) throws IOException;
 
   /**
    * @param r Region to compact

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java Tue Feb 26 20:51:59 2013
@@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.HDFSBlock
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.mapreduce.JobUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -156,8 +156,9 @@ public class CompactionTool extends Conf
         " family=" + familyDir.getName());
       HStore store = getStore(region, familyDir);
       do {
-        CompactionRequest cr = store.requestCompaction();
-        List<StoreFile> storeFiles = store.compact(cr);
+        CompactionContext compaction = store.requestCompaction();
+        if (compaction == null) break;
+        List<StoreFile> storeFiles = store.compact(compaction);
         if (storeFiles != null && !storeFiles.isEmpty()) {
           if (keepCompactedFiles && deleteCompacted) {
             for (StoreFile storeFile: storeFiles) {
@@ -465,4 +466,4 @@ public class CompactionTool extends Conf
   public static void main(String[] args) throws Exception {
     System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
   }
-}
\ No newline at end of file
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java Tue Feb 26 20:51:59 2013
@@ -18,28 +18,64 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 
 /**
- * Default StoreEngine creates the default compactor, policy, and store file manager.
+ * Default StoreEngine creates the default compactor, policy, and store file manager, or
+ * their derivatives.
  */
 @InterfaceAudience.Private
-public class DefaultStoreEngine extends StoreEngine {
+public class DefaultStoreEngine extends StoreEngine<
+  DefaultCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
+
   public DefaultStoreEngine(Configuration conf, Store store, KVComparator comparator) {
     super(conf, store, comparator);
   }
 
   @Override
-  protected void createComponents(PP<StoreFileManager> storeFileManager,
-      PP<CompactionPolicy> compactionPolicy, PP<Compactor> compactor) {
-    storeFileManager.set(new DefaultStoreFileManager(this.comparator));
-    compactionPolicy.set(new DefaultCompactionPolicy(this.conf, this.store));
-    compactor.set(new DefaultCompactor(this.conf, this.store));
+  protected void createComponents() {
+    storeFileManager = new DefaultStoreFileManager(this.comparator);
+
+    // TODO: compactor and policy may be separately pluggable, but must derive from default ones.
+    compactor = new DefaultCompactor(this.conf, this.store);
+    compactionPolicy = new DefaultCompactionPolicy(this.conf, this.store/*as StoreConfigInfo*/);
+  }
+
+  @Override
+  protected CompactionContext createCompactionContext() {
+    return new DefaultCompactionContext();
+  }
+
+  private class DefaultCompactionContext extends CompactionContext {
+    @Override
+    public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
+        boolean mayUseOffPeak, boolean forceMajor) throws IOException {
+      request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
+          filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);
+      return request != null;
+    }
+
+    @Override
+    public List<Path> compact() throws IOException {
+      return compactor.compact(request);
+    }
+
+    @Override
+    public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
+      return compactionPolicy.preSelectCompactionForCoprocessor(
+          storeFileManager.getStorefiles(), filesCompacting);
+    }
   }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Feb 26 20:51:59 2013
@@ -119,6 +119,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
 import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
 import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
@@ -1291,13 +1292,9 @@ public class HRegion implements HeapSize
    */
   public void compactStores() throws IOException {
     for (Store s : getStores().values()) {
-      CompactionRequest cr = s.requestCompaction();
-      if(cr != null) {
-        try {
-          compact(cr);
-        } finally {
-          s.finishRequest(cr);
-        }
+      CompactionContext compaction = s.requestCompaction();
+      if (compaction != null) {
+        compact(compaction, s);
       }
     }
   }
@@ -1317,45 +1314,46 @@ public class HRegion implements HeapSize
    * @return whether the compaction completed
    * @throws IOException e
    */
-  public boolean compact(CompactionRequest cr)
-  throws IOException {
-    if (cr == null) {
-      return false;
-    }
+  public boolean compact(CompactionContext compaction, Store store) throws IOException {
+    assert compaction != null && compaction.hasSelection();
+    assert !compaction.getRequest().getFiles().isEmpty();
     if (this.closing.get() || this.closed.get()) {
       LOG.debug("Skipping compaction on " + this + " because closing/closed");
+      store.cancelRequestedCompaction(compaction);
       return false;
     }
-    Preconditions.checkArgument(cr.getHRegion().equals(this));
     MonitoredTask status = null;
+    boolean didPerformCompaction = false;
     // block waiting for the lock for compaction
     lock.readLock().lock();
     try {
-      status = TaskMonitor.get().createStatus(
-        "Compacting " + cr.getStore() + " in " + this);
+      status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
       if (this.closed.get()) {
-        LOG.debug("Skipping compaction on " + this + " because closed");
+        String msg = "Skipping compaction on " + this + " because closed";
+        LOG.debug(msg);
+        status.abort(msg);
         return false;
       }
-      boolean decr = true;
+      boolean wasStateSet = false;
       try {
         synchronized (writestate) {
           if (writestate.writesEnabled) {
+            wasStateSet = true;
             ++writestate.compacting;
           } else {
             String msg = "NOT compacting region " + this + ". Writes disabled.";
             LOG.info(msg);
             status.abort(msg);
-            decr = false;
             return false;
           }
         }
-        LOG.info("Starting compaction on " + cr.getStore() + " in region "
-            + this + (cr.getCompactSelection().isOffPeakCompaction()?" as an off-peak compaction":""));
+        LOG.info("Starting compaction on " + store + " in region " + this
+            + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
         doRegionCompactionPrep();
         try {
-          status.setStatus("Compacting store " + cr.getStore());
-          cr.getStore().compact(cr);
+          status.setStatus("Compacting store " + store);
+          didPerformCompaction = true;
+          store.compact(compaction);
         } catch (InterruptedIOException iioe) {
           String msg = "compaction interrupted";
           LOG.info(msg, iioe);
@@ -1363,7 +1361,7 @@ public class HRegion implements HeapSize
           return false;
         }
       } finally {
-        if (decr) {
+        if (wasStateSet) {
           synchronized (writestate) {
             --writestate.compacting;
             if (writestate.compacting <= 0) {
@@ -1376,6 +1374,7 @@ public class HRegion implements HeapSize
       return true;
     } finally {
       try {
+        if (!didPerformCompaction) store.cancelRequestedCompaction(compaction);
         if (status != null) status.cleanup();
       } finally {
         lock.readLock().unlock();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Tue Feb 26 20:51:59 2013
@@ -66,7 +66,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.exceptions.InvalidHFileException;
 import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -154,8 +154,8 @@ public class HStore implements Store {
   // Comparing KeyValues
   private final KeyValue.KVComparator comparator;
 
-  final Compactor compactor;
-  
+  final StoreEngine<?, ?, ?> storeEngine;
+
   private OffPeakCompactions offPeakCompactions;
 
   private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
@@ -223,8 +223,11 @@ public class HStore implements Store {
           "hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
     }
 
-    StoreEngine engine = StoreEngine.create(this, this.conf, this.comparator);
-    this.storeFileManager = engine.getStoreFileManager();
+    storeEngine = StoreEngine.create(this, this.conf, this.comparator);
+    // Copy some things to local fields for convenience.
+    this.storeFileManager = storeEngine.getStoreFileManager();
+    this.compactionPolicy = storeEngine.getCompactionPolicy();
+
     this.storeFileManager.loadFiles(loadStoreFiles());
 
     // Initialize checksum type from name. The names are CRC32, CRC32C, etc.
@@ -243,9 +246,6 @@ public class HStore implements Store {
                 + HStore.flush_retries_number);
       }
     }
-    this.compactionPolicy = engine.getCompactionPolicy();
-    // Get the compaction tool instance for this policy
-    this.compactor = engine.getCompactor();
   }
 
   /**
@@ -1067,15 +1067,15 @@ public class HStore implements Store {
    * <p>We don't want to hold the structureLock for the whole time, as a compact()
    * can be lengthy and we want to allow cache-flushes during this period.
    *
-   * @param cr
-   *          compaction details obtained from requestCompaction()
+   * @param compaction compaction details obtained from requestCompaction()
    * @throws IOException
    * @return Storefile we compacted into or null if we failed or opted out early.
    */
-  List<StoreFile> compact(CompactionRequest cr) throws IOException {
-    if (cr == null || cr.getFiles().isEmpty()) return null;
-    Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
-    List<StoreFile> filesToCompact = cr.getFiles();
+  public List<StoreFile> compact(CompactionContext compaction) throws IOException {
+    assert compaction != null && compaction.hasSelection();
+    CompactionRequest cr = compaction.getRequest();
+    Collection<StoreFile> filesToCompact = cr.getFiles();
+    assert !filesToCompact.isEmpty();
     synchronized (filesCompacting) {
       // sanity check: we're compacting files that this store knows about
       // TODO: change this to LOG.error() after more debugging
@@ -1091,16 +1091,20 @@ public class HStore implements Store {
     List<StoreFile> sfs = new ArrayList<StoreFile>();
     long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
     try {
-      List<Path> newFiles = this.compactor.compact(cr);
+      // Commence the compaction.
+      List<Path> newFiles = compaction.compact();
       // Move the compaction into place.
       if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
         for (Path newFile: newFiles) {
-          StoreFile sf = completeCompaction(filesToCompact, newFile);
+          assert newFile != null;
+          StoreFile sf = moveFileIntoPlace(newFile);
           if (region.getCoprocessorHost() != null) {
             region.getCoprocessorHost().postCompact(this, sf, cr);
           }
+          assert sf != null;
           sfs.add(sf);
         }
+        completeCompaction(filesToCompact, sfs);
       } else {
         for (Path newFile: newFiles) {
           // Create storefile around what we wrote with a reader on it.
@@ -1111,15 +1115,24 @@ public class HStore implements Store {
         }
       }
     } finally {
-      synchronized (filesCompacting) {
-        filesCompacting.removeAll(filesToCompact);
-      }
+      finishCompactionRequest(cr);
     }
+    logCompactionEndMessage(cr, sfs, compactionStartTime);
+    return sfs;
+  }
 
+  /**
+   * Log a very elaborate compaction completion message.
+   * @param cr Request.
+   * @param sfs Resulting files.
+   * @param compactionStartTime Start time.
+   */
+  private void logCompactionEndMessage(
+      CompactionRequest cr, List<StoreFile> sfs, long compactionStartTime) {
     long now = EnvironmentEdgeManager.currentTimeMillis();
     StringBuilder message = new StringBuilder(
       "Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
-      + filesToCompact.size() + " file(s) in " + this + " of "
+      + cr.getFiles().size() + " file(s) in " + this + " of "
       + this.region.getRegionInfo().getRegionNameAsString()
       + " into ");
     if (sfs.isEmpty()) {
@@ -1139,7 +1152,23 @@ public class HStore implements Store {
       .append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
       .append(" to execute.");
     LOG.info(message.toString());
-    return sfs;
+  }
+
+  // Package-visible for tests
+  StoreFile moveFileIntoPlace(Path newFile) throws IOException {
+    validateStoreFile(newFile);
+    // Move the file into the right spot
+    Path destPath = new Path(homedir, newFile.getName());
+    LOG.info("Renaming compacted file at " + newFile + " to " + destPath);
+    if (!fs.rename(newFile, destPath)) {
+      String err = "Failed move of compacted file " + newFile + " to " +  destPath;
+      LOG.error(err);
+      throw new IOException(err);
+    }
+    StoreFile result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
+        this.family.getBloomFilterType(), this.dataBlockEncoder);
+    result.createReader();
+    return result;
   }
 
   /**
@@ -1181,13 +1210,17 @@ public class HStore implements Store {
 
     try {
       // Ready to go. Have list of files to compact.
-      List<Path> newFiles = this.compactor.compactForTesting(filesToCompact, isMajor);
+      List<Path> newFiles =
+          this.storeEngine.getCompactor().compactForTesting(filesToCompact, isMajor);
       for (Path newFile: newFiles) {
         // Move the compaction into place.
-        StoreFile sf = completeCompaction(filesToCompact, newFile);
+        StoreFile sf = moveFileIntoPlace(newFile);
         if (region.getCoprocessorHost() != null) {
           region.getCoprocessorHost().postCompact(this, sf, null);
         }
+        ArrayList<StoreFile> tmp = new ArrayList<StoreFile>();
+        tmp.add(sf);
+        completeCompaction(filesToCompact, tmp);
       }
     } finally {
       synchronized (filesCompacting) {
@@ -1203,7 +1236,7 @@ public class HStore implements Store {
 
   @Override
   public CompactionProgress getCompactionProgress() {
-    return this.compactor.getProgress();
+    return this.storeEngine.getCompactor().getProgress();
   }
 
   @Override
@@ -1219,100 +1252,102 @@ public class HStore implements Store {
   }
 
   @Override
-  public CompactionRequest requestCompaction() throws IOException {
+  public CompactionContext requestCompaction() throws IOException {
     return requestCompaction(Store.NO_PRIORITY, null);
   }
 
   @Override
-  public CompactionRequest requestCompaction(int priority, CompactionRequest request)
+  public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
       throws IOException {
     // don't even select for compaction if writes are disabled
     if (!this.region.areWritesEnabled()) {
       return null;
     }
 
+    CompactionContext compaction = storeEngine.createCompaction();
     this.lock.readLock().lock();
     try {
-      List<StoreFile> candidates = Lists.newArrayList(storeFileManager.getStorefiles());
       synchronized (filesCompacting) {
-        // First we need to pre-select compaction, and then pre-compact selection!
-        candidates = compactionPolicy.preSelectCompaction(candidates, filesCompacting);
-        boolean override = false;
+        // First, see if coprocessor would want to override selection.
         if (region.getCoprocessorHost() != null) {
-          override = region.getCoprocessorHost().preCompactSelection(this, candidates, request);
+          List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
+          boolean override = region.getCoprocessorHost().preCompactSelection(
+              this, candidatesForCoproc, baseRequest);
+          if (override) {
+            // Coprocessor is overriding normal file selection.
+            compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
+          }
         }
-        CompactSelection filesToCompact;
-        if (override) {
-          // coprocessor is overriding normal file selection
-          filesToCompact = new CompactSelection(candidates);
-        } else {
+
+        // Normal case - coprocessor is not overriding file selection.
+        if (!compaction.hasSelection()) {
           boolean isUserCompaction = priority == Store.PRIORITY_USER;
           boolean mayUseOffPeak = this.offPeakCompactions.tryStartOffPeakRequest();
-          filesToCompact = compactionPolicy.selectCompaction(candidates, isUserCompaction,
+          compaction.select(this.filesCompacting, isUserCompaction,
               mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
-          if (mayUseOffPeak && !filesToCompact.isOffPeakCompaction()) {
-            // Compaction policy doesn't want to do anything with off-peak.
+          assert compaction.hasSelection();
+          if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
+            // Compaction policy doesn't want to take advantage of off-peak.
             this.offPeakCompactions.endOffPeakRequest();
           }
         }
-
         if (region.getCoprocessorHost() != null) {
-          region.getCoprocessorHost().postCompactSelection(this,
-            ImmutableList.copyOf(filesToCompact.getFilesToCompact()), request);
+          region.getCoprocessorHost().postCompactSelection(
+              this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
+        }
+
+        // Selected files; see if we have a compaction with some custom base request.
+        if (baseRequest != null) {
+          // Update the request with what the system thinks the request should be;
+          // its up to the request if it wants to listen.
+          compaction.forceSelect(
+              baseRequest.combineWith(compaction.getRequest()));
         }
 
-        // no files to compact
-        if (filesToCompact.getFilesToCompact().isEmpty()) {
+        // Finally, we have the resulting files list. Check if we have any files at all.
+        final Collection<StoreFile> selectedFiles = compaction.getRequest().getFiles();
+        if (selectedFiles.isEmpty()) {
           return null;
         }
 
-        // basic sanity check: do not try to compact the same StoreFile twice.
-        if (!Collections.disjoint(filesCompacting, filesToCompact.getFilesToCompact())) {
+        // Update filesCompacting (check that we do not try to compact the same StoreFile twice).
+        if (!Collections.disjoint(filesCompacting, selectedFiles)) {
           // TODO: change this from an IAE to LOG.error after sufficient testing
           Preconditions.checkArgument(false, "%s overlaps with %s",
-              filesToCompact, filesCompacting);
+              selectedFiles, filesCompacting);
         }
-        filesCompacting.addAll(filesToCompact.getFilesToCompact());
+        filesCompacting.addAll(selectedFiles);
         Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
 
-        boolean isMajor =
-            (filesToCompact.getFilesToCompact().size() == this.getStorefilesCount());
-        if (isMajor) {
-          // since we're enqueuing a major, update the compaction wait interval
-          this.forceMajor = false;
-        }
-
-        LOG.debug(getRegionInfo().getEncodedName() + " - " +
-            getColumnFamilyName() + ": Initiating " +
-            (isMajor ? "major" : "minor") + " compaction");
-
-        // everything went better than expected. create a compaction request
-        int pri = getCompactPriority(priority);
-        //not a special compaction request, so we need to make one
-        if(request == null){
-          request = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
-        }else{
-          //update the request with what the system thinks the request should be
-          //its up to the request if it wants to listen
-          request.setSelection(filesToCompact);
-          request.setIsMajor(isMajor);
-          request.setPriority(pri);
-        }
+        // If we're enqueuing a major, clear the force flag.
+        boolean isMajor = selectedFiles.size() == this.getStorefilesCount();
+        this.forceMajor = this.forceMajor && !isMajor;
+
+        // Set common request properties.
+        compaction.getRequest().setPriority(getCompactPriority(priority));
+        compaction.getRequest().setIsMajor(isMajor);
+        compaction.getRequest().setDescription(
+            region.getRegionNameAsString(), getColumnFamilyName());
       }
     } finally {
       this.lock.readLock().unlock();
     }
-    if (request != null) {
-      this.region.reportCompactionRequestStart(request.isMajor());
-    }
-    return request;
+
+    LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating "
+        + (compaction.getRequest().isMajor() ? "major" : "minor") + " compaction");
+    this.region.reportCompactionRequestStart(compaction.getRequest().isMajor());
+    return compaction;
+  }
+
+  public void cancelRequestedCompaction(CompactionContext compaction) {
+    finishCompactionRequest(compaction.getRequest());
   }
 
-  public void finishRequest(CompactionRequest cr) {
+  private void finishCompactionRequest(CompactionRequest cr) {
     this.region.reportCompactionRequestEnd(cr.isMajor());
-    if (cr.getCompactSelection().isOffPeakCompaction()) {
+    if (cr.isOffPeak()) {
       this.offPeakCompactions.endOffPeakRequest();
-      cr.getCompactSelection().setOffPeak(false);
+      cr.setOffPeak(false);
     }
     synchronized (filesCompacting) {
       filesCompacting.removeAll(cr.getFiles());
@@ -1363,28 +1398,8 @@ public class HStore implements Store {
    * @return StoreFile created. May be null.
    * @throws IOException
    */
-  StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
-                                       final Path newFile)
-      throws IOException {
-    // 1. Moving the new files into place -- if there is a new file (may not
-    // be if all cells were expired or deleted).
-    StoreFile result = null;
-    if (newFile != null) {
-      validateStoreFile(newFile);
-      // Move the file into the right spot
-      Path destPath = new Path(homedir, newFile.getName());
-      LOG.info("Renaming compacted file at " + newFile + " to " + destPath);
-      if (!fs.rename(newFile, destPath)) {
-        LOG.error("Failed move of compacted file " + newFile + " to " +
-            destPath);
-        throw new IOException("Failed move of compacted file " + newFile +
-            " to " + destPath);
-      }
-      result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
-          this.family.getBloomFilterType(), this.dataBlockEncoder);
-      result.createReader();
-    }
-
+  private void completeCompaction(final Collection<StoreFile> compactedFiles,
+      final Collection<StoreFile> result) throws IOException {
     try {
       this.lock.writeLock().lock();
       try {
@@ -1392,11 +1407,7 @@ public class HStore implements Store {
         // delete old store files until we have sent out notification of
         // change in case old files are still being accessed by outstanding
         // scanners.
-        List<StoreFile> results = new ArrayList<StoreFile>(1);
-        if (result != null) {
-          results.add(result);
-        }
-        this.storeFileManager.addCompactionResults(compactedFiles, results);
+        this.storeFileManager.addCompactionResults(compactedFiles, result);
         filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
       } finally {
         // We need the lock, as long as we are updating the storeFiles
@@ -1418,8 +1429,8 @@ public class HStore implements Store {
     } catch (IOException e) {
       e = RemoteExceptionHandler.checkIOException(e);
       LOG.error("Failed replacing compacted files in " + this +
-        ". Compacted file is " + (result == null? "none": result.toString()) +
-        ".  Files replaced " + compactedFiles.toString() +
+        ". Compacted files are " + (result == null? "none": result.toString()) +
+        ". Files replaced " + compactedFiles.toString() +
         " some of which may have been already removed", e);
     }
 
@@ -1435,7 +1446,6 @@ public class HStore implements Store {
       this.storeSize += r.length();
       this.totalUncompressedBytes += r.getTotalUncompressedBytes();
     }
-    return result;
   }
 
   /*

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Tue Feb 26 20:51:59 2013
@@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.coprocess
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Feb 26 20:51:59 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.io.HeapSi
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 
@@ -157,12 +158,14 @@ public interface Store extends HeapSize,
    */
   public CompactionProgress getCompactionProgress();
 
-  public CompactionRequest requestCompaction() throws IOException;
+  public CompactionContext requestCompaction() throws IOException;
 
-  public CompactionRequest requestCompaction(int priority, CompactionRequest request)
+  public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
       throws IOException;
 
-  public void finishRequest(CompactionRequest cr);
+  public void cancelRequestedCompaction(CompactionContext compaction);
+
+  public List<StoreFile> compact(CompactionContext compaction) throws IOException;
 
   /**
    * @return true if we should run a major compaction.

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java Tue Feb 26 20:51:59 2013
@@ -24,8 +24,11 @@ import java.io.IOException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 /**
@@ -34,14 +37,15 @@ import org.apache.hadoop.hbase.util.Refl
  * they are tied together and replaced together via StoreEngine-s.
  */
 @InterfaceAudience.Private
-public abstract class StoreEngine {
+public abstract class StoreEngine<
+  CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> {
   protected final Store store;
   protected final Configuration conf;
   protected final KVComparator comparator;
 
-  private final PP<CompactionPolicy> compactionPolicy = new PP<CompactionPolicy>();
-  private final PP<Compactor> compactor = new PP<Compactor>();
-  private final PP<StoreFileManager> storeFileManager = new PP<StoreFileManager>();
+  protected CP compactionPolicy;
+  protected C compactor;
+  protected SFM storeFileManager;
   private boolean isInitialized = false;
 
   /**
@@ -50,7 +54,7 @@ public abstract class StoreEngine {
    */
   public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
 
-  private static final Class<? extends StoreEngine>
+  private static final Class<? extends StoreEngine<?, ?, ?>>
     DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class;
 
   /**
@@ -58,7 +62,7 @@ public abstract class StoreEngine {
    */
   public CompactionPolicy getCompactionPolicy() {
     createComponentsOnce();
-    return this.compactionPolicy.get();
+    return this.compactionPolicy;
   }
 
   /**
@@ -66,7 +70,7 @@ public abstract class StoreEngine {
    */
   public Compactor getCompactor() {
     createComponentsOnce();
-    return this.compactor.get();
+    return this.compactor;
   }
 
   /**
@@ -74,7 +78,7 @@ public abstract class StoreEngine {
    */
   public StoreFileManager getStoreFileManager() {
     createComponentsOnce();
-    return this.storeFileManager.get();
+    return this.storeFileManager;
   }
 
   protected StoreEngine(Configuration conf, Store store, KVComparator comparator) {
@@ -83,18 +87,22 @@ public abstract class StoreEngine {
     this.comparator = comparator;
   }
 
+  public CompactionContext createCompaction() {
+    createComponentsOnce();
+    return this.createCompactionContext();
+  }
+
+  protected abstract CompactionContext createCompactionContext();
+
   /**
    * Create the StoreEngine's components.
-   * @param storeFileManager out parameter for StoreFileManager.
-   * @param compactionPolicy out parameter for CompactionPolicy.
-   * @param compactor out parameter for Compactor.
    */
-  protected abstract void createComponents(PP<StoreFileManager> storeFileManager,
-      PP<CompactionPolicy> compactionPolicy, PP<Compactor> compactor);
+  protected abstract void createComponents();
 
   private void createComponentsOnce() {
     if (isInitialized) return;
-    createComponents(storeFileManager, compactionPolicy, compactor);
+    createComponents();
+    assert compactor != null && compactionPolicy != null && storeFileManager != null;
     isInitialized = true;
   }
 
@@ -117,18 +125,4 @@ public abstract class StoreEngine {
       throw new IOException("Unable to load configured store engine '" + className + "'", e);
     }
   }
-
-  /**
-   * To allow StoreEngine-s to have custom dependencies between 3 components, we want to create
-   * them in one place. To return multiple, simulate C++ pointer to pointers/C# out params.
-   */
-  protected static class PP<T> {
-    private T t = null;
-    public void set(T t) {
-      this.t = t;
-    }
-    public T get() {
-      return this.t;
-    }
-  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java Tue Feb 26 20:51:59 2013
@@ -39,7 +39,7 @@ import com.google.common.collect.Immutab
  * Implementations are assumed to be not thread safe.
  */
 @InterfaceAudience.Private
-interface StoreFileManager {
+public interface StoreFileManager {
   /**
    * Loads the initial store files into empty StoreFileManager.
    * @param storeFiles The files to load.
@@ -56,7 +56,6 @@ interface StoreFileManager {
    * Adds compaction results into the structure.
    * @param compactedFiles The input files for the compaction.
    * @param results The resulting files for the compaction.
-   * @return The files that can be removed from storage. Usually,
    */
   public abstract void addCompactionResults(
       Collection<StoreFile> compactedFiles, Collection<StoreFile> results);

Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java?rev=1450407&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java Tue Feb 26 20:51:59 2013
@@ -0,0 +1,80 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+
+
+/**
+ * This class holds all "physical" details necessary to run a compaction.
+ * It also has compaction request with all the logical details.
+ * Hence, this class is basically the compaction.
+ */
+@InterfaceAudience.Private
+public abstract class CompactionContext {
+  protected CompactionRequest request = null;
+
+  /**
+   * Called before coprocessor preCompactSelection and should filter the candidates
+   * for coprocessor; i.e. exclude the files that definitely cannot be compacted at this time.
+   * @param filesCompacting files currently compacting
+   * @return the list of files that can theoretically be compacted.
+   */
+  public abstract List<StoreFile> preSelect(final List<StoreFile> filesCompacting);
+
+  /**
+   * Called to select files for compaction. Must fill in the request field if successful.
+   * @param filesCompacting Files currently being compacted by other compactions.
+   * @param isUserCompaction Whether this is a user compaction.
+   * @param mayUseOffPeak Whether the underlying policy may assume it's off-peak hours.
+   * @param forceMajor Whether to force major compaction.
+   * @return Whether the selection succeeded. Selection may be empty and lead to no compaction.
+   */
+  public abstract boolean select(
+      final List<StoreFile> filesCompacting, final boolean isUserCompaction,
+      final boolean mayUseOffPeak, final boolean forceMajor) throws IOException;
+
+  /**
+   * Forces external selection to be applied for this compaction.
+   * @param request The pre-cooked request with selection and other settings.
+   */
+  public void forceSelect(CompactionRequest request) {
+    this.request = request;
+  }
+
+  /**
+   * Runs the compaction based on current selection. select/forceSelect must have been called.
+   * @return The new file paths resulting from compaction.
+   */
+  public abstract List<Path> compact() throws IOException;
+
+  public CompactionRequest getRequest() {
+    assert hasSelection();
+    return this.request;
+  }
+
+  public boolean hasSelection() {
+    return this.request != null;
+  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java Tue Feb 26 20:51:59 2013
@@ -43,25 +43,6 @@ public abstract class CompactionPolicy {
   }
 
   /**
-   * This is called before coprocessor preCompactSelection and should filter the candidates
-   * for coprocessor; i.e. exclude the files that definitely cannot be compacted at this time.
-   * @param candidateFiles candidate files, ordered from oldest to newest
-   * @param filesCompacting files currently compacting
-   * @return the list of files that can theoretically be compacted.
-   */
-  public abstract List<StoreFile> preSelectCompaction(
-      List<StoreFile> candidateFiles, final List<StoreFile> filesCompacting);
-
-  /**
-   * @param candidateFiles candidate files, ordered from oldest to newest
-   * @return subset copy of candidate list that meets compaction criteria
-   * @throws java.io.IOException
-   */
-  public abstract CompactSelection selectCompaction(
-    final List<StoreFile> candidateFiles, final boolean isUserCompaction,
-    final boolean mayUseOffPeak, final boolean forceMajor) throws IOException;
-
-  /**
    * @param storeFiles Store files in the store.
    * @return The system compaction priority of the store, based on storeFiles.
    *         The priority range is as such - the smaller values are higher priority;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Tue Feb 26 20:51:59 2013
@@ -18,22 +18,13 @@
  */
 package org.apache.hadoop.hbase.regionserver.compactions;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadPoolExecutor;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -46,231 +37,193 @@ import com.google.common.base.Predicate;
 import com.google.common.collect.Collections2;
 
 /**
- * This class holds all details necessary to run a compaction.
+ * This class holds all logical details necessary to run a compaction.
  */
 @InterfaceAudience.LimitedPrivate({ "coprocessor" })
 @InterfaceStability.Evolving
-public class CompactionRequest implements Comparable<CompactionRequest>,
-    Runnable {
-    static final Log LOG = LogFactory.getLog(CompactionRequest.class);
-    private final HRegion region;
-    private final HStore store;
-    private CompactSelection compactSelection;
-    private long totalSize;
-    private boolean isMajor;
-    private int priority;
-    private final Long timeInNanos;
-    private HRegionServer server = null;
-
-    public static CompactionRequest getRequestForTesting(Collection<StoreFile> selection,
-        boolean isMajor) {
-      return new CompactionRequest(null, null, new CompactSelection(new ArrayList<StoreFile>(
-        selection)), isMajor, 0, System.nanoTime());
-    }
-
-    /**
-     * Constructor for a custom compaction. Uses the setXXX methods to update the state of the
-     * compaction before being used.
-     */
-    public CompactionRequest(HRegion region, HStore store, int priority) {
-    this(region, store, null, false, priority, System
-        .nanoTime());
-    }
-
-    public CompactionRequest(HRegion r, HStore s, CompactSelection files, boolean isMajor, int p) {
-      // delegate to the internal constructor after checking basic preconditions
-      this(Preconditions.checkNotNull(r), s, Preconditions.checkNotNull(files), isMajor, p, System
-          .nanoTime());
-    }
-
-    private CompactionRequest(HRegion region, HStore store, CompactSelection files, boolean isMajor,
-        int priority, long startTime) {
-      this.region = region;
-      this.store = store;
-      this.isMajor = isMajor;
-      this.priority = priority;
-      this.timeInNanos = startTime;
-      if (files != null) {
-        this.setSelection(files);
-      }
-    }
-
-    /**
-     * This function will define where in the priority queue the request will
-     * end up.  Those with the highest priorities will be first.  When the
-     * priorities are the same it will first compare priority then date
-     * to maintain a FIFO functionality.
-     *
-     * <p>Note: The date is only accurate to the millisecond which means it is
-     * possible that two requests were inserted into the queue within a
-     * millisecond.  When that is the case this function will break the tie
-     * arbitrarily.
-     */
-    @Override
-    public int compareTo(CompactionRequest request) {
-      //NOTE: The head of the priority queue is the least element
-      if (this.equals(request)) {
-        return 0; //they are the same request
-      }
-      int compareVal;
-
-      compareVal = priority - request.priority; //compare priority
-      if (compareVal != 0) {
-        return compareVal;
-      }
-
-      compareVal = timeInNanos.compareTo(request.timeInNanos);
-      if (compareVal != 0) {
-        return compareVal;
-      }
-
-      // break the tie based on hash code
-      return this.hashCode() - request.hashCode();
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      return (this == obj);
-    }
-
-    /** Gets the HRegion for the request */
-    public HRegion getHRegion() {
-      return region;
-    }
-
-    /** Gets the Store for the request */
-    public HStore getStore() {
-      return store;
-    }
-
-    /** Gets the compact selection object for the request */
-    public CompactSelection getCompactSelection() {
-      return compactSelection;
-    }
-
-    /** Gets the StoreFiles for the request */
-    public List<StoreFile> getFiles() {
-      return compactSelection.getFilesToCompact();
-    }
-
-    /** Gets the total size of all StoreFiles in compaction */
-    public long getSize() {
-      return totalSize;
-    }
-
-    public boolean isMajor() {
-      return this.isMajor;
-    }
-
-    /** Gets the priority for the request */
-    public int getPriority() {
-      return priority;
-    }
-
-    public long getSelectionTime() {
-      return compactSelection.getSelectionTime();
-    }
-
-    /** Gets the priority for the request */
-    public void setPriority(int p) {
-      this.priority = p;
-    }
-
-    public void setServer(HRegionServer hrs) {
-      this.server = hrs;
-    }
-
-    /**
-     * Set the files (and, implicitly, the size of the compaction based on those files)
-     * @param files files that should be included in the compaction
-     */
-    public void setSelection(CompactSelection files) {
-      long sz = 0;
-      for (StoreFile sf : files.getFilesToCompact()) {
-        sz += sf.getReader().length();
-      }
-      this.totalSize = sz;
-      this.compactSelection = files;
-    }
-
-    /**
-     * Specify if this compaction should be a major compaction based on the state of the store
-     * @param isMajor <tt>true</tt> if the system determines that this compaction should be a major
-     *          compaction
-     */
-    public void setIsMajor(boolean isMajor) {
-      this.isMajor = isMajor;
-    }
-
-    @Override
-    public String toString() {
-      String fsList = Joiner.on(", ").join(
-          Collections2.transform(Collections2.filter(
-              compactSelection.getFilesToCompact(),
-              new Predicate<StoreFile>() {
-                public boolean apply(StoreFile sf) {
-                  return sf.getReader() != null;
-                }
-            }), new Function<StoreFile, String>() {
-              public String apply(StoreFile sf) {
-                return StringUtils.humanReadableInt(sf.getReader().length());
+public class CompactionRequest implements Comparable<CompactionRequest> {
+  static final Log LOG = LogFactory.getLog(CompactionRequest.class);
+  // was this compaction promoted to an off-peak
+  private boolean isOffPeak = false;
+  private boolean isMajor = false;
+  private int priority = Store.NO_PRIORITY;
+  private Collection<StoreFile> filesToCompact;
+
+  // CompactRequest object creation time.
+  private long selectionTime;
+  // System time used to compare objects in FIFO order. TODO: maybe use selectionTime?
+  private Long timeInNanos;
+  private String regionName = "";
+  private String storeName = "";
+  private long totalSize = -1L;
+
+  /**
+   * This ctor should be used by coprocessors that want to subclass CompactionRequest.
+   */
+  public CompactionRequest() {
+    this.selectionTime = EnvironmentEdgeManager.currentTimeMillis();
+    this.timeInNanos = System.nanoTime();
+  }
+
+  public CompactionRequest(Collection<StoreFile> files) {
+    this();
+    Preconditions.checkNotNull(files);
+    this.filesToCompact = files;
+    recalculateSize();
+  }
+
+  /**
+   * Called before compaction is executed by CompactSplitThread; for use by coproc subclasses.
+   */
+  public void beforeExecute() {}
+
+  /**
+   * Called after compaction is executed by CompactSplitThread; for use by coproc subclasses.
+   */
+  public void afterExecute() {}
+
+  /**
+   * Combines the request with other request. Coprocessors subclassing CR may override
+   * this if they want to do clever things based on CompactionPolicy selection that
+   * is passed to this method via "other". The default implementation just does a copy.
+   * @param other Request to combine with.
+   * @return The result (may be "this" or "other").
+   */
+  public CompactionRequest combineWith(CompactionRequest other) {
+    this.filesToCompact = new ArrayList<StoreFile>(other.getFiles());
+    this.isOffPeak = other.isOffPeak;
+    this.isMajor = other.isMajor;
+    this.priority = other.priority;
+    this.selectionTime = other.selectionTime;
+    this.timeInNanos = other.timeInNanos;
+    this.regionName = other.regionName;
+    this.storeName = other.storeName;
+    this.totalSize = other.totalSize;
+    return this;
+  }
+
+  /**
+   * This function will define where in the priority queue the request will
+   * end up.  Those with the highest priorities will be first.  When the
+   * priorities are the same it will first compare priority then date
+   * to maintain a FIFO functionality.
+   *
+   * <p>Note: The date is only accurate to the millisecond which means it is
+   * possible that two requests were inserted into the queue within a
+   * millisecond.  When that is the case this function will break the tie
+   * arbitrarily.
+   */
+  @Override
+  public int compareTo(CompactionRequest request) {
+    //NOTE: The head of the priority queue is the least element
+    if (this.equals(request)) {
+      return 0; //they are the same request
+    }
+    int compareVal;
+
+    compareVal = priority - request.priority; //compare priority
+    if (compareVal != 0) {
+      return compareVal;
+    }
+
+    compareVal = timeInNanos.compareTo(request.timeInNanos);
+    if (compareVal != 0) {
+      return compareVal;
+    }
+
+    // break the tie based on hash code
+    return this.hashCode() - request.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    return (this == obj);
+  }
+
+  public Collection<StoreFile> getFiles() {
+    return this.filesToCompact;
+  }
+
+  /**
+   * Sets the region/store name, for logging.
+   */
+  public void setDescription(String regionName, String storeName) {
+    this.regionName = regionName;
+    this.storeName = storeName;
+  }
+
+  /** Gets the total size of all StoreFiles in compaction */
+  public long getSize() {
+    return totalSize;
+  }
+
+  public boolean isMajor() {
+    return this.isMajor;
+  }
+
+  /** Gets the priority for the request */
+  public int getPriority() {
+    return priority;
+  }
+
+  /** Sets the priority for the request */
+  public void setPriority(int p) {
+    this.priority = p;
+  }
+
+  public boolean isOffPeak() {
+    return this.isOffPeak;
+  }
+
+  public void setOffPeak(boolean value) {
+    this.isOffPeak = value;
+  }
+
+  public long getSelectionTime() {
+    return this.selectionTime;
+  }
+
+  /**
+   * Specify if this compaction should be a major compaction based on the state of the store
+   * @param isMajor <tt>true</tt> if the system determines that this compaction should be a major
+   *          compaction
+   */
+  public void setIsMajor(boolean isMajor) {
+    this.isMajor = isMajor;
+  }
+
+  @Override
+  public String toString() {
+    String fsList = Joiner.on(", ").join(
+        Collections2.transform(Collections2.filter(
+            this.getFiles(),
+            new Predicate<StoreFile>() {
+              public boolean apply(StoreFile sf) {
+                return sf.getReader() != null;
               }
-            }));
-
-      return "regionName=" + region.getRegionNameAsString() +
-        ", storeName=" + new String(store.getFamily().getName()) +
-        ", fileCount=" + compactSelection.getFilesToCompact().size() +
-        ", fileSize=" + StringUtils.humanReadableInt(totalSize) +
-          ((fsList.isEmpty()) ? "" : " (" + fsList + ")") +
-        ", priority=" + priority + ", time=" + timeInNanos;
-    }
-
-    @Override
-    public void run() {
-      Preconditions.checkNotNull(server);
-      if (server.isStopped()) {
-        return;
-      }
-      try {
-        long start = EnvironmentEdgeManager.currentTimeMillis();
-        boolean completed = region.compact(this);
-        long now = EnvironmentEdgeManager.currentTimeMillis();
-        LOG.info(((completed) ? "completed" : "aborted") + " compaction: " +
-              this + "; duration=" + StringUtils.formatTimeDiff(now, start));
-        if (completed) {
-          // degenerate case: blocked regions require recursive enqueues
-          if (store.getCompactPriority() <= 0) {
-            server.compactSplitThread.requestCompaction(region, store, "Recursive enqueue", null);
-            } else {
-              // see if the compaction has caused us to exceed max region size
-          server.getCompactSplitThread().requestSplit(region);
+          }), new Function<StoreFile, String>() {
+            public String apply(StoreFile sf) {
+              return StringUtils.humanReadableInt(sf.getReader().length());
             }
-        }
-      } catch (IOException ex) {
-        LOG.error("Compaction failed " + this, RemoteExceptionHandler
-            .checkIOException(ex));
-        server.checkFileSystem();
-      } catch (Exception ex) {
-        LOG.error("Compaction failed " + this, ex);
-        server.checkFileSystem();
-      } finally {
-        store.finishRequest(this);
-        LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
-      }
-    }
+          }));
 
-    /**
-     * Cleanup class to use when rejecting a compaction request from the queue.
-     */
-    public static class Rejection implements RejectedExecutionHandler {
-
-      @Override
-      public void rejectedExecution(Runnable request, ThreadPoolExecutor pool) {
-        if (request instanceof CompactionRequest) {
-          CompactionRequest cr = (CompactionRequest) request;
-          LOG.debug("Compaction Rejected: " + cr);
-          cr.getStore().finishRequest(cr);
-        }
-      }
+    return "regionName=" + regionName + ", storeName=" + storeName +
+      ", fileCount=" + this.getFiles().size() +
+      ", fileSize=" + StringUtils.humanReadableInt(totalSize) +
+        ((fsList.isEmpty()) ? "" : " (" + fsList + ")") +
+      ", priority=" + priority + ", time=" + timeInNanos;
+  }
+
+  /**
+   * Recalculate the size of the compaction based on current files.
+   * @param files files that should be included in the compaction
+   */
+  private void recalculateSize() {
+    long sz = 0;
+    for (StoreFile sf : this.filesToCompact) {
+      sz += sf.getReader().length();
     }
+    this.totalSize = sz;
+  }
 }
+

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java Tue Feb 26 20:51:59 2013
@@ -60,7 +60,9 @@ public abstract class Compactor {
    */
   public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
       throws IOException {
-    return compact(CompactionRequest.getRequestForTesting(filesToCompact, isMajor));
+    CompactionRequest cr = new CompactionRequest(filesToCompact);
+    cr.setIsMajor(isMajor);
+    return this.compact(cr);
   }
 
   public CompactionProgress getProgress() {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java Tue Feb 26 20:51:59 2013
@@ -32,9 +32,11 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
 import org.apache.hadoop.hbase.regionserver.HStore;
 import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileManager;
 import org.apache.hadoop.hbase.regionserver.StoreUtils;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
@@ -49,16 +51,15 @@ import com.google.common.collect.Collect
  */
 @InterfaceAudience.Private
 public class DefaultCompactionPolicy extends CompactionPolicy {
-
   private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicy.class);
 
   public DefaultCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
     super(conf, storeConfigInfo);
   }
 
-  @Override
-  public List<StoreFile> preSelectCompaction(
-      List<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
+
+  private ArrayList<StoreFile> getCurrentEligibleFiles(
+      ArrayList<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
     // candidates = all storefiles not already in compaction queue
     if (!filesCompacting.isEmpty()) {
       // exclude all files older than the newest file we're currently
@@ -71,6 +72,11 @@ public class DefaultCompactionPolicy ext
     return candidateFiles;
   }
 
+  public List<StoreFile> preSelectCompactionForCoprocessor(
+      final Collection<StoreFile> candidates, final List<StoreFile> filesCompacting) {
+    return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), filesCompacting);
+  }
+
   @Override
   public int getSystemCompactionPriority(final Collection<StoreFile> storeFiles) {
     return this.comConf.getBlockingStorefileCount() - storeFiles.size();
@@ -81,20 +87,20 @@ public class DefaultCompactionPolicy ext
    * @return subset copy of candidate list that meets compaction criteria
    * @throws java.io.IOException
    */
-  @Override
-  public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
-      final boolean isUserCompaction, final boolean mayUseOffPeak, final boolean forceMajor)
-    throws IOException {
+  public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
+      final List<StoreFile> filesCompacting, final boolean isUserCompaction,
+      final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
     // Preliminary compaction subject to filters
-    CompactSelection candidateSelection = new CompactSelection(candidateFiles);
+    ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
+    candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
     long cfTtl = this.storeConfigInfo.getStoreFileTtl();
     if (!forceMajor) {
       // If there are expired files, only select them so that compaction deletes them
       if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
-        CompactSelection expiredSelection = selectExpiredStoreFiles(
-          candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
+        ArrayList<StoreFile> expiredSelection = selectExpiredStoreFiles(
+            candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
         if (expiredSelection != null) {
-          return expiredSelection;
+          return new CompactionRequest(expiredSelection);
         }
       }
       candidateSelection = skipLargeFiles(candidateSelection);
@@ -106,21 +112,23 @@ public class DefaultCompactionPolicy ext
     // Or, if there are any references among the candidates.
     boolean majorCompaction = (
       (forceMajor && isUserCompaction)
-      || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact()))
-          && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()))
-      || StoreUtils.hasReferences(candidateSelection.getFilesToCompact())
+      || ((forceMajor || isMajorCompaction(candidateSelection))
+          && (candidateSelection.size() < comConf.getMaxFilesToCompact()))
+      || StoreUtils.hasReferences(candidateSelection)
       );
 
     if (!majorCompaction) {
       // we're doing a minor compaction, let's see what files are applicable
-      candidateSelection.setOffPeak(mayUseOffPeak);
       candidateSelection = filterBulk(candidateSelection);
-      candidateSelection = applyCompactionPolicy(candidateSelection);
+      candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak);
       candidateSelection = checkMinFilesCriteria(candidateSelection);
     }
-    candidateSelection =
-        removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
-    return candidateSelection;
+    candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
+    CompactionRequest result = new CompactionRequest(candidateSelection);
+    if (!majorCompaction && !candidateSelection.isEmpty()) {
+      result.setOffPeak(mayUseOffPeak);
+    }
+    return result;
   }
 
   /**
@@ -133,33 +141,25 @@ public class DefaultCompactionPolicy ext
    * @return A CompactSelection contains the expired store files as
    *         filesToCompact
    */
-  private CompactSelection selectExpiredStoreFiles(
-      CompactSelection candidates, long maxExpiredTimeStamp) {
-    List<StoreFile> filesToCompact = candidates.getFilesToCompact();
-    if (filesToCompact == null || filesToCompact.size() == 0)
-      return null;
+  private ArrayList<StoreFile> selectExpiredStoreFiles(
+      ArrayList<StoreFile> candidates, long maxExpiredTimeStamp) {
+    if (candidates == null || candidates.size() == 0) return null;
     ArrayList<StoreFile> expiredStoreFiles = null;
-    boolean hasExpiredStoreFiles = false;
-    CompactSelection expiredSFSelection = null;
 
-    for (StoreFile storeFile : filesToCompact) {
+    for (StoreFile storeFile : candidates) {
       if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
         LOG.info("Deleting the expired store file by compaction: "
             + storeFile.getPath() + " whose maxTimeStamp is "
             + storeFile.getReader().getMaxTimestamp()
             + " while the max expired timestamp is " + maxExpiredTimeStamp);
-        if (!hasExpiredStoreFiles) {
+        if (expiredStoreFiles == null) {
           expiredStoreFiles = new ArrayList<StoreFile>();
-          hasExpiredStoreFiles = true;
         }
         expiredStoreFiles.add(storeFile);
       }
     }
 
-    if (hasExpiredStoreFiles) {
-      expiredSFSelection = new CompactSelection(expiredStoreFiles);
-    }
-    return expiredSFSelection;
+    return expiredStoreFiles;
   }
 
   /**
@@ -168,18 +168,16 @@ public class DefaultCompactionPolicy ext
    * exclude all files above maxCompactSize
    * Also save all references. We MUST compact them
    */
-  private CompactSelection skipLargeFiles(CompactSelection candidates) {
+  private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates) {
     int pos = 0;
-    while (pos < candidates.getFilesToCompact().size() &&
-      candidates.getFilesToCompact().get(pos).getReader().length() >
-        comConf.getMaxCompactSize() &&
-      !candidates.getFilesToCompact().get(pos).isReference()) {
+    while (pos < candidates.size() && !candidates.get(pos).isReference()
+      && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize())) {
       ++pos;
     }
     if (pos > 0) {
       LOG.debug("Some files are too large. Excluding " + pos
           + " files from compaction candidates");
-      candidates.clearSubList(0, pos);
+      candidates.subList(0, pos).clear();
     }
     return candidates;
   }
@@ -189,9 +187,8 @@ public class DefaultCompactionPolicy ext
    * @return filtered subset
    * exclude all bulk load files if configured
    */
-  private CompactSelection filterBulk(CompactSelection candidates) {
-    candidates.getFilesToCompact().removeAll(Collections2.filter(
-        candidates.getFilesToCompact(),
+  private ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
+    candidates.removeAll(Collections2.filter(candidates,
         new Predicate<StoreFile>() {
           @Override
           public boolean apply(StoreFile input) {
@@ -206,9 +203,9 @@ public class DefaultCompactionPolicy ext
    * @return filtered subset
    * take upto maxFilesToCompact from the start
    */
-  private CompactSelection removeExcessFiles(CompactSelection candidates,
+  private ArrayList<StoreFile> removeExcessFiles(ArrayList<StoreFile> candidates,
       boolean isUserCompaction, boolean isMajorCompaction) {
-    int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
+    int excess = candidates.size() - comConf.getMaxFilesToCompact();
     if (excess > 0) {
       if (isMajorCompaction && isUserCompaction) {
         LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
@@ -216,8 +213,7 @@ public class DefaultCompactionPolicy ext
       } else {
         LOG.debug("Too many admissible files. Excluding " + excess
           + " files from compaction candidates");
-        candidates.clearSubList(comConf.getMaxFilesToCompact(),
-          candidates.getFilesToCompact().size());
+        candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
       }
     }
     return candidates;
@@ -227,16 +223,14 @@ public class DefaultCompactionPolicy ext
    * @return filtered subset
    * forget the compactionSelection if we don't have enough files
    */
-  private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
+  private ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
     int minFiles = comConf.getMinFilesToCompact();
-    if (candidates.getFilesToCompact().size() < minFiles) {
+    if (candidates.size() < minFiles) {
       if(LOG.isDebugEnabled()) {
-        LOG.debug("Not compacting files because we only have " +
-            candidates.getFilesToCompact().size() +
-          " files ready for compaction.  Need " + minFiles + " to initiate.");
+        LOG.debug("Not compacting files because we only have " + candidates.size() +
+          " files ready for compaction. Need " + minFiles + " to initiate.");
       }
-      candidates.emptyFileList();
-      candidates.setOffPeak(false);
+      candidates.clear();
     }
     return candidates;
   }
@@ -271,25 +265,26 @@ public class DefaultCompactionPolicy ext
     *    | |  | |  | |  | | | | | |
     *    | |  | |  | |  | | | | | |
     */
-  CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
-    if (candidates.getFilesToCompact().isEmpty()) {
+  ArrayList<StoreFile> applyCompactionPolicy(
+      ArrayList<StoreFile> candidates, boolean mayUseOffPeak) throws IOException {
+    if (candidates.isEmpty()) {
       return candidates;
     }
 
     // we're doing a minor compaction, let's see what files are applicable
     int start = 0;
     double ratio = comConf.getCompactionRatio();
-    if (candidates.isOffPeakCompaction()) {
+    if (mayUseOffPeak) {
       ratio = comConf.getCompactionRatioOffPeak();
       LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
     }
 
     // get store file sizes for incremental compacting selection.
-    int countOfFiles = candidates.getFilesToCompact().size();
+    final int countOfFiles = candidates.size();
     long[] fileSizes = new long[countOfFiles];
     long[] sumSize = new long[countOfFiles];
     for (int i = countOfFiles - 1; i >= 0; --i) {
-      StoreFile file = candidates.getFilesToCompact().get(i);
+      StoreFile file = candidates.get(i);
       fileSizes[i] = file.getReader().length();
       // calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
       int tooFar = i + comConf.getMaxFilesToCompact() - 1;
@@ -309,8 +304,9 @@ public class DefaultCompactionPolicy ext
         + " files from " + countOfFiles + " candidates");
     }
 
-    candidates = candidates.getSubList(start, countOfFiles);
-
+    if (start > 0) {
+      candidates.subList(0, start).clear();
+    }
     return candidates;
   }
 

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Tue Feb 26 20:51:59 2013
@@ -594,7 +594,7 @@ public class TestCompaction extends HBas
     HStore store = (HStore) r.getStore(COLUMN_FAMILY);
 
     Collection<StoreFile> storeFiles = store.getStorefiles();
-    Compactor tool = store.compactor;
+    Compactor tool = store.storeEngine.getCompactor();
 
     List<Path> newFiles = tool.compactForTesting(storeFiles, false);
 
@@ -611,7 +611,7 @@ public class TestCompaction extends HBas
     stream.close();
 
     try {
-      store.completeCompaction(storeFiles, origPath);
+      ((HStore)store).moveFileIntoPlace(origPath);
     } catch (Exception e) {
       // The complete compaction should fail and the corrupt file should remain
       // in the 'tmp' directory;
@@ -635,7 +635,7 @@ public class TestCompaction extends HBas
     }
     store.triggerMajorCompaction();
 
-    CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null);
+    CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null).getRequest();
     assertNotNull("Expected to receive a compaction request", request);
     assertEquals(
       "System-requested major compaction should not occur if there are too many store files",
@@ -653,7 +653,7 @@ public class TestCompaction extends HBas
       createStoreFile(r);
     }
     store.triggerMajorCompaction();
-    CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null);
+    CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null).getRequest();
     assertNotNull("Expected to receive a compaction request", request);
     assertEquals(
       "User-requested major compaction should always occur, even if there are too many store files",
@@ -680,7 +680,7 @@ public class TestCompaction extends HBas
     }
 
     CountDownLatch latch = new CountDownLatch(1);
-    TrackableCompactionRequest request = new TrackableCompactionRequest(r, (HStore) store, latch);
+    TrackableCompactionRequest request = new TrackableCompactionRequest(latch);
     thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request);
     // wait for the latch to complete.
     latch.await();
@@ -698,16 +698,15 @@ public class TestCompaction extends HBas
      * Constructor for a custom compaction. Uses the setXXX methods to update the state of the
      * compaction before being used.
      */
-    public TrackableCompactionRequest(HRegion region, HStore store, CountDownLatch finished) {
-      super(region, store, Store.PRIORITY_USER);
+    public TrackableCompactionRequest(CountDownLatch finished) {
+      super();
       this.done = finished;
     }
 
     @Override
-    public void run() {
-      super.run();
+    public void afterExecute() {
+      super.afterExecute();
       this.done.countDown();
     }
   }
-
 }