You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jy...@apache.org on 2013/02/20 23:40:55 UTC

svn commit: r1448449 - in /hbase/trunk/hbase-server/src: main/java/org/apache/hadoop/hbase/coprocessor/ main/java/org/apache/hadoop/hbase/regionserver/ main/java/org/apache/hadoop/hbase/regionserver/compactions/ test/java/org/apache/hadoop/hbase/region...

Author: jyates
Date: Wed Feb 20 22:40:55 2013
New Revision: 1448449

URL: http://svn.apache.org/r1448449
Log:
HBASE-7725: Add ability to create custom compaction request

Modified:
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java Wed Feb 20 22:40:55 2013
@@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.ScanType;
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Pair;
@@ -135,28 +137,63 @@ public abstract class BaseRegionObserver
       final Store store, final List<StoreFile> candidates) throws IOException { }
 
   @Override
+  public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Store store, final List<StoreFile> candidates, final CompactionRequest request)
+      throws IOException {
+    preCompactSelection(c, store, candidates);
+  }
+
+  @Override
   public void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
       final Store store, final ImmutableList<StoreFile> selected) { }
 
   @Override
+  public void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Store store, final ImmutableList<StoreFile> selected, CompactionRequest request) {
+    postCompactSelection(c, store, selected);
+  }
+
+  @Override
   public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
       final Store store, final InternalScanner scanner, final ScanType scanType)
-          throws IOException {
+      throws IOException {
     return scanner;
   }
 
   @Override
-  public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
-      final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
-      final long earliestPutTs, final InternalScanner s) throws IOException {
+  public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
+      final Store store, final InternalScanner scanner, final ScanType scanType,
+      CompactionRequest request) throws IOException {
+    return preCompact(e, store, scanner, scanType);
+  }
+
+  @Override
+  public InternalScanner preCompactScannerOpen(
+      final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+      List<? extends KeyValueScanner> scanners, final ScanType scanType, final long earliestPutTs,
+      final InternalScanner s) throws IOException {
     return null;
   }
 
   @Override
+  public InternalScanner preCompactScannerOpen(
+      final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+      List<? extends KeyValueScanner> scanners, final ScanType scanType, final long earliestPutTs,
+      final InternalScanner s, CompactionRequest request) throws IOException {
+    return preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s);
+  }
+
+  @Override
   public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
       final StoreFile resultFile) throws IOException {
   }
 
+@Override
+  public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
+      final StoreFile resultFile, CompactionRequest request) throws IOException {
+    postCompact(e, store, resultFile);
+  }
+
   @Override
   public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> e,
       final byte [] row, final byte [] family, final Result result)
@@ -351,4 +388,4 @@ public abstract class BaseRegionObserver
     List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
     return hasLoaded;
   }
-}
+}
\ No newline at end of file

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Wed Feb 20 22:40:55 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 
@@ -126,87 +127,184 @@ public interface RegionObserver extends 
       final StoreFile resultFile) throws IOException;
 
   /**
-   * Called prior to selecting the {@link StoreFile}s to compact from the list
-   * of available candidates.  To alter the files used for compaction, you may
-   * mutate the passed in list of candidates.
+   * Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of
+   * available candidates. To alter the files used for compaction, you may mutate the passed in list
+   * of candidates.
    * @param c the environment provided by the region server
    * @param store the store where compaction is being requested
    * @param candidates the store files currently available for compaction
+   * @param request custom compaction request
    * @throws IOException if an error occurred on the coprocessor
    */
   void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Store store, final List<StoreFile> candidates, final CompactionRequest request)
+      throws IOException;
+
+  /**
+   * Called prior to selecting the {@link StoreFile}s to compact from the list of available
+   * candidates. To alter the files used for compaction, you may mutate the passed in list of
+   * candidates.
+   * @param c the environment provided by the region server
+   * @param store the store where compaction is being requested
+   * @param candidates the store files currently available for compaction
+   * @throws IOException if an error occurred on the coprocessor
+   * @deprecated Use {@link #preCompactSelection(ObserverContext, Store, List, Object)} instead
+   */
+  void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
       final Store store, final List<StoreFile> candidates) throws IOException;
 
   /**
-   * Called after the {@link StoreFile}s to compact have been selected from the
-   * available candidates.
+   * Called after the {@link StoreFile}s to compact have been selected from the available
+   * candidates.
+   * @param c the environment provided by the region server
+   * @param store the store being compacted
+   * @param selected the store files selected to compact
+   * @param request custom compaction request
+   */
+  void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Store store, final ImmutableList<StoreFile> selected, CompactionRequest request);
+
+  /**
+   * Called after the {@link StoreFile}s to compact have been selected from the available
+   * candidates.
    * @param c the environment provided by the region server
    * @param store the store being compacted
    * @param selected the store files selected to compact
+   * @param compactionAttributes custom attributes for the compaction
+   * @deprecated use {@link #postCompactSelection(ObserverContext, Store, ImmutableList, Object)}
+   *             instead.
    */
+  @Deprecated
   void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
       final Store store, final ImmutableList<StoreFile> selected);
 
   /**
-   * Called prior to writing the {@link StoreFile}s selected for compaction into
-   * a new {@code StoreFile}.  To override or modify the compaction process,
-   * implementing classes have two options:
+   * Called prior to writing the {@link StoreFile}s selected for compaction into a new
+   * {@code StoreFile}. To override or modify the compaction process, implementing classes have two
+   * options:
    * <ul>
-   *   <li>Wrap the provided {@link InternalScanner} with a custom
-   *   implementation that is returned from this method.  The custom scanner
-   *   can then inspect {@link KeyValue}s from the wrapped scanner, applying
-   *   its own policy to what gets written.</li>
-   *   <li>Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()}
-   *   and provide a custom implementation for writing of new
-   *   {@link StoreFile}s.  <strong>Note: any implementations bypassing
-   *   core compaction using this approach must write out new store files
-   *   themselves or the existing data will no longer be available after
-   *   compaction.</strong></li>
+   * <li>Wrap the provided {@link InternalScanner} with a custom implementation that is returned
+   * from this method. The custom scanner can then inspect {@link KeyValue}s from the wrapped
+   * scanner, applying its own policy to what gets written.</li>
+   * <li>Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} and provide a
+   * custom implementation for writing of new {@link StoreFile}s. <strong>Note: any implementations
+   * bypassing core compaction using this approach must write out new store files themselves or the
+   * existing data will no longer be available after compaction.</strong></li>
    * </ul>
    * @param c the environment provided by the region server
    * @param store the store being compacted
-   * @param scanner the scanner over existing data used in the store file
-   * rewriting
+   * @param scanner the scanner over existing data used in the store file rewriting
    * @param scanType type of Scan
-   * @return the scanner to use during compaction.  Should not be {@code null}
-   * unless the implementation is writing new store files on its own.
+   * @param request the requested compaction
+   * @return the scanner to use during compaction. Should not be {@code null} unless the
+   *         implementation is writing new store files on its own.
    * @throws IOException if an error occurred on the coprocessor
    */
   InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
-      final Store store, final InternalScanner scanner,
-      final ScanType scanType) throws IOException;
+      final Store store, final InternalScanner scanner, final ScanType scanType,
+      CompactionRequest request) throws IOException;
 
   /**
-   * Called prior to writing the {@link StoreFile}s selected for compaction into
-   * a new {@code StoreFile} and prior to creating the scanner used to read the
-   * input files.  To override or modify the compaction process,
-   * implementing classes can return a new scanner to provide the KeyValues to be
-   * stored into the new {@code StoreFile} or null to perform the default processing.
-   * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
+   * Called prior to writing the {@link StoreFile}s selected for compaction into a new
+   * {@code StoreFile}. To override or modify the compaction process, implementing classes have two
+   * options:
+   * <ul>
+   * <li>Wrap the provided {@link InternalScanner} with a custom implementation that is returned
+   * from this method. The custom scanner can then inspect {@link KeyValue}s from the wrapped
+   * scanner, applying its own policy to what gets written.</li>
+   * <li>Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} and provide a
+   * custom implementation for writing of new {@link StoreFile}s. <strong>Note: any implementations
+   * bypassing core compaction using this approach must write out new store files themselves or the
+   * existing data will no longer be available after compaction.</strong></li>
+   * </ul>
+   * @param c the environment provided by the region server
+   * @param store the store being compacted
+   * @param scanner the scanner over existing data used in the store file rewriting
+   * @param scanType type of Scan
+   * @param request the requested compaction
+   * @return the scanner to use during compaction. Should not be {@code null} unless the
+   *         implementation is writing new store files on its own.
+   * @throws IOException if an error occurred on the coprocessor
+   * @deprecated use
+   *             {@link #preCompact(ObserverContext, Store, InternalScanner, ScanType, CompactionRequest)}
+   *             instead
+   */
+  @Deprecated
+  InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Store store, final InternalScanner scanner, final ScanType scanType) throws IOException;
+
+  /**
+   * Called prior to writing the {@link StoreFile}s selected for compaction into a new
+   * {@code StoreFile} and prior to creating the scanner used to read the input files. To override
+   * or modify the compaction process, implementing classes can return a new scanner to provide the
+   * KeyValues to be stored into the new {@code StoreFile} or null to perform the default
+   * processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
    * effect in this hook.
    * @param c the environment provided by the region server
    * @param store the store being compacted
    * @param scanners the list {@link StoreFileScanner}s to be read from
    * @param scanType the {@link ScanType} indicating whether this is a major or minor compaction
-   * @param earliestPutTs timestamp of the earliest put that was found in any of the involved
-   * store files
+   * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store
+   *          files
    * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
-   * @return the scanner to use during compaction.  {@code null} if the default implementation
-   * is to be used.
+   * @param request the requested compaction
+   * @return the scanner to use during compaction. {@code null} if the default implementation is to
+   *         be used.
    * @throws IOException if an error occurred on the coprocessor
    */
   InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
       final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
+      final long earliestPutTs, final InternalScanner s, CompactionRequest request)
+      throws IOException;
+
+  /**
+   * Called prior to writing the {@link StoreFile}s selected for compaction into a new
+   * {@code StoreFile} and prior to creating the scanner used to read the input files. To override
+   * or modify the compaction process, implementing classes can return a new scanner to provide the
+   * KeyValues to be stored into the new {@code StoreFile} or null to perform the default
+   * processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
+   * effect in this hook.
+   * @param c the environment provided by the region server
+   * @param store the store being compacted
+   * @param scanners the list {@link StoreFileScanner}s to be read from
+   * @param scanType the {@link ScanType} indicating whether this is a major or minor compaction
+   * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store
+   *          files
+   * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
+   * @param request the requested compaction
+   * @return the scanner to use during compaction. {@code null} if the default implementation is to
+   *         be used.
+   * @throws IOException if an error occurred on the coprocessor
+   * @deprecated Use
+   *             {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
+   *             instead.
+   */
+  @Deprecated
+  InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+      final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
       final long earliestPutTs, final InternalScanner s) throws IOException;
 
   /**
-   * Called after compaction has completed and the new store file has been
-   * moved in to place.
+   * Called after compaction has completed and the new store file has been moved in to place.
+   * @param c the environment provided by the region server
+   * @param store the store being compacted
+   * @param resultFile the new store file written out during compaction
+   * @param request the requested compaction
+   * @throws IOException if an error occurred on the coprocessor
+   */
+  void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+      StoreFile resultFile, CompactionRequest request) throws IOException;
+
+  /**
+   * Called after compaction has completed and the new store file has been moved in to place.
    * @param c the environment provided by the region server
    * @param store the store being compacted
    * @param resultFile the new store file written out during compaction
    * @throws IOException if an error occurred on the coprocessor
+   * @deprecated Use {@link #postCompact(ObserverContext, Store, StoreFile, CompactionRequest)}
+   *             instead
    */
+  @Deprecated
   void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
       StoreFile resultFile) throws IOException;
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Wed Feb 20 22:40:55 2013
@@ -19,7 +19,9 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Executors;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -183,23 +185,41 @@ public class CompactSplitThread implemen
     }
   }
 
-  public synchronized void requestCompaction(final HRegion r,
-      final String why) throws IOException {
-    for (Store s : r.getStores().values()) {
-      requestCompaction(r, s, why, Store.NO_PRIORITY);
-    }
+  @Override
+  public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why)
+      throws IOException {
+    return requestCompaction(r, why, null);
+  }
+
+  @Override
+  public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
+      List<CompactionRequest> requests) throws IOException {
+    return requestCompaction(r, why, Store.NO_PRIORITY, requests);
   }
 
-  public synchronized void requestCompaction(final HRegion r, final Store s,
-      final String why) throws IOException {
-    requestCompaction(r, s, why, Store.NO_PRIORITY);
+  @Override
+  public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
+      final String why, CompactionRequest request) throws IOException {
+    return requestCompaction(r, s, why, Store.NO_PRIORITY, request);
   }
 
-  public synchronized void requestCompaction(final HRegion r, final String why,
-      int p) throws IOException {
-    for (Store s : r.getStores().values()) {
-      requestCompaction(r, s, why, p);
+  @Override
+  public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
+      int p, List<CompactionRequest> requests) throws IOException {
+    // not a special compaction request, so make our own list
+    List<CompactionRequest> ret;
+    if (requests == null) {
+      ret = new ArrayList<CompactionRequest>(r.getStores().size());
+      for (Store s : r.getStores().values()) {
+        ret.add(requestCompaction(r, s, why, p, null));
+      }
+    } else {
+      ret = new ArrayList<CompactionRequest>(requests.size());
+      for (CompactionRequest request : requests) {
+        requests.add(requestCompaction(r, request.getStore(), why, p, request));
+      }
     }
+    return ret;
   }
 
   /**
@@ -207,13 +227,15 @@ public class CompactSplitThread implemen
    * @param s Store to request compaction on
    * @param why Why compaction requested -- used in debug messages
    * @param priority override the default priority (NO_PRIORITY == decide)
+   * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
+   *          compaction will be used.
    */
-  public synchronized void requestCompaction(final HRegion r, final Store s,
-      final String why, int priority) throws IOException {
+  public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
+      final String why, int priority, CompactionRequest request) throws IOException {
     if (this.server.isStopped()) {
-      return;
+      return null;
     }
-    CompactionRequest cr = s.requestCompaction(priority);
+    CompactionRequest cr = s.requestCompaction(priority, request);
     if (cr != null) {
       cr.setServer(server);
       if (priority != Store.NO_PRIORITY) {
@@ -234,6 +256,7 @@ public class CompactSplitThread implemen
             " because compaction request was cancelled");
       }
     }
+    return cr;
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java Wed Feb 20 22:40:55 2013
@@ -19,42 +19,73 @@
 package org.apache.hadoop.hbase.regionserver;
 
 import java.io.IOException;
+import java.util.List;
+
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 
 @InterfaceAudience.Private
 public interface CompactionRequestor {
   /**
    * @param r Region to compact
    * @param why Why compaction was requested -- used in debug messages
+   * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
+   *         compactions were started
    * @throws IOException
    */
-  public void requestCompaction(final HRegion r, final String why) throws IOException;
+  public List<CompactionRequest> requestCompaction(final HRegion r, final String why)
+      throws IOException;
 
   /**
    * @param r Region to compact
-   * @param s Store within region to compact
    * @param why Why compaction was requested -- used in debug messages
+   * @param requests custom compaction requests. Each compaction must specify the store on which it
+   *          is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all
+   *          stores for the region.
+   * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
+   *         compactions were started
    * @throws IOException
    */
-  public void requestCompaction(final HRegion r, final Store s, final String why)
+  public List<CompactionRequest> requestCompaction(final HRegion r, final String why,
+      List<CompactionRequest> requests)
       throws IOException;
 
   /**
    * @param r Region to compact
+   * @param s Store within region to compact
+   * @param why Why compaction was requested -- used in debug messages
+   * @param request custom compaction request for the {@link HRegion} and {@link Store}. Custom
+   *          request must be <tt>null</tt> or be constructed with matching region and store.
+   * @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started.
+   * @throws IOException
+   */
+  public CompactionRequest requestCompaction(final HRegion r, final Store s, final String why,
+      CompactionRequest request) throws IOException;
+
+  /**
+   * @param r Region to compact
    * @param why Why compaction was requested -- used in debug messages
    * @param pri Priority of this compaction. minHeap. <=0 is critical
+   * @param requests custom compaction requests. Each compaction must specify the store on which it
+   *          is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all
+   *          stores for the region.
+   * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
+   *         compactions were started.
    * @throws IOException
    */
-  public void requestCompaction(final HRegion r, final String why, int pri) throws IOException;
+  public List<CompactionRequest> requestCompaction(final HRegion r, final String why, int pri,
+      List<CompactionRequest> requests) throws IOException;
 
   /**
    * @param r Region to compact
    * @param s Store within region to compact
    * @param why Why compaction was requested -- used in debug messages
    * @param pri Priority of this compaction. minHeap. <=0 is critical
+   * @param request custom compaction request to run. {@link Store} and {@link HRegion} for the
+   *          request must match the region and store specified here.
+   * @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started
    * @throws IOException
    */
-  public void requestCompaction(final HRegion r, final Store s,
-      final String why, int pri) throws IOException;
-
+  public CompactionRequest requestCompaction(final HRegion r, final Store s, final String why,
+      int pri, CompactionRequest request) throws IOException;
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Feb 20 22:40:55 2013
@@ -1348,17 +1348,17 @@ public class HRegionServer implements Cl
           try {
             if (s.needsCompaction()) {
               // Queue a compaction. Will recognize if major is needed.
-              this.instance.compactSplitThread.requestCompaction(r, s,
-                getName() + " requests compaction");
+              this.instance.compactSplitThread.requestCompaction(r, s, getName()
+                  + " requests compaction", null);
             } else if (s.isMajorCompaction()) {
-              if (majorCompactPriority == DEFAULT_PRIORITY ||
-                  majorCompactPriority > r.getCompactPriority()) {
-                this.instance.compactSplitThread.requestCompaction(r, s,
-                    getName() + " requests major compaction; use default priority");
+              if (majorCompactPriority == DEFAULT_PRIORITY
+                  || majorCompactPriority > r.getCompactPriority()) {
+                this.instance.compactSplitThread.requestCompaction(r, s, getName()
+                    + " requests major compaction; use default priority", null);
               } else {
-               this.instance.compactSplitThread.requestCompaction(r, s,
-                  getName() + " requests major compaction; use configured priority",
-                  this.majorCompactPriority);
+                this.instance.compactSplitThread.requestCompaction(r, s, getName()
+                    + " requests major compaction; use configured priority",
+                  this.majorCompactPriority, null);
               }
             }
           } catch (IOException e) {
@@ -1665,7 +1665,7 @@ public class HRegionServer implements Cl
     // Do checks to see if we need to compact (references or too many files)
     for (Store s : r.getStores().values()) {
       if (s.hasReferences() || s.needsCompaction()) {
-        getCompactionRequester().requestCompaction(r, s, "Opening Region");
+        getCompactionRequester().requestCompaction(r, s, "Opening Region", null);
       }
     }
     long openSeqNum = r.getOpenSeqNum();
@@ -3630,10 +3630,10 @@ public class HRegionServer implements Cl
       String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
       if(family != null) {
         compactSplitThread.requestCompaction(region, store, log,
-          Store.PRIORITY_USER);
+          Store.PRIORITY_USER, null);
       } else {
         compactSplitThread.requestCompaction(region, log,
-          Store.PRIORITY_USER);
+          Store.PRIORITY_USER, null);
       }
       return CompactRegionResponse.newBuilder().build();
     } catch (IOException ie) {
@@ -4034,4 +4034,11 @@ public class HRegionServer implements Cl
     String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
     return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
   }
+
+  /**
+   * @return the underlying {@link CompactSplitThread} for the servers
+   */
+  public CompactSplitThread getCompactSplitThread() {
+    return this.compactSplitThread;
+  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Wed Feb 20 22:40:55 2013
@@ -1090,14 +1090,13 @@ public class HStore implements Store {
     List<StoreFile> sfs = new ArrayList<StoreFile>();
     long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
     try {
-      List<Path> newFiles =
-        this.compactor.compact(filesToCompact, cr.isMajor());
+      List<Path> newFiles = this.compactor.compact(cr);
       // Move the compaction into place.
       if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
         for (Path newFile: newFiles) {
           StoreFile sf = completeCompaction(filesToCompact, newFile);
           if (region.getCoprocessorHost() != null) {
-            region.getCoprocessorHost().postCompact(this, sf);
+            region.getCoprocessorHost().postCompact(this, sf, cr);
           }
           sfs.add(sf);
         }
@@ -1181,13 +1180,12 @@ public class HStore implements Store {
 
     try {
       // Ready to go. Have list of files to compact.
-      List<Path> newFiles =
-        this.compactor.compact(filesToCompact, isMajor);
+      List<Path> newFiles = this.compactor.compactForTesting(filesToCompact, isMajor);
       for (Path newFile: newFiles) {
         // Move the compaction into place.
         StoreFile sf = completeCompaction(filesToCompact, newFile);
         if (region.getCoprocessorHost() != null) {
-          region.getCoprocessorHost().postCompact(this, sf);
+          region.getCoprocessorHost().postCompact(this, sf, null);
         }
       }
     } finally {
@@ -1219,17 +1217,19 @@ public class HStore implements Store {
     return compactionPolicy.isMajorCompaction(this.storeFileManager.getStorefiles());
   }
 
+  @Override
   public CompactionRequest requestCompaction() throws IOException {
-    return requestCompaction(Store.NO_PRIORITY);
+    return requestCompaction(Store.NO_PRIORITY, null);
   }
 
-  public CompactionRequest requestCompaction(int priority) throws IOException {
+  @Override
+  public CompactionRequest requestCompaction(int priority, CompactionRequest request)
+      throws IOException {
     // don't even select for compaction if writes are disabled
     if (!this.region.areWritesEnabled()) {
       return null;
     }
 
-    CompactionRequest ret = null;
     this.lock.readLock().lock();
     try {
       List<StoreFile> candidates = Lists.newArrayList(storeFileManager.getStorefiles());
@@ -1238,7 +1238,7 @@ public class HStore implements Store {
         candidates = compactionPolicy.preSelectCompaction(candidates, filesCompacting);
         boolean override = false;
         if (region.getCoprocessorHost() != null) {
-          override = region.getCoprocessorHost().preCompactSelection(this, candidates);
+          override = region.getCoprocessorHost().preCompactSelection(this, candidates, request);
         }
         CompactSelection filesToCompact;
         if (override) {
@@ -1257,7 +1257,7 @@ public class HStore implements Store {
 
         if (region.getCoprocessorHost() != null) {
           region.getCoprocessorHost().postCompactSelection(this,
-              ImmutableList.copyOf(filesToCompact.getFilesToCompact()));
+            ImmutableList.copyOf(filesToCompact.getFilesToCompact()), request);
         }
 
         // no files to compact
@@ -1287,15 +1287,24 @@ public class HStore implements Store {
 
         // everything went better than expected. create a compaction request
         int pri = getCompactPriority(priority);
-        ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
+        //not a special compaction request, so we need to make one
+        if(request == null){
+          request = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
+        }else{
+          //update the request with what the system thinks the request should be
+          //its up to the request if it wants to listen
+          request.setSelection(filesToCompact);
+          request.setIsMajor(isMajor);
+          request.setPriority(pri);
+        }
       }
     } finally {
       this.lock.readLock().unlock();
     }
-    if (ret != null) {
-      this.region.reportCompactionRequestStart(ret.isMajor());
+    if (request != null) {
+      this.region.reportCompactionRequestStart(request.isMajor());
     }
-    return ret;
+    return request;
   }
 
   public void finishRequest(CompactionRequest cr) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Wed Feb 20 22:40:55 2013
@@ -55,6 +55,8 @@ import org.apache.hadoop.hbase.coprocess
 import org.apache.hadoop.hbase.filter.ByteArrayComparable;
 import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -345,11 +347,10 @@ public class RegionCoprocessorHost
 
   /**
    * See
-   * {@link RegionObserver#preCompactScannerOpen(ObserverContext,
-   *    Store, List, ScanType, long, InternalScanner)}
+   * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
    */
   public InternalScanner preCompactScannerOpen(Store store, List<StoreFileScanner> scanners,
-      ScanType scanType, long earliestPutTs) throws IOException {
+      ScanType scanType, long earliestPutTs, CompactionRequest request) throws IOException {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     InternalScanner s = null;
     for (RegionEnvironment env: coprocessors) {
@@ -357,7 +358,7 @@ public class RegionCoprocessorHost
         ctx = ObserverContext.createAndPrepare(env, ctx);
         try {
           s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store, scanners,
-              scanType, earliestPutTs, s);
+            scanType, earliestPutTs, s, request);
         } catch (Throwable e) {
           handleCoprocessorThrowable(env,e);
         }
@@ -370,22 +371,23 @@ public class RegionCoprocessorHost
   }
 
   /**
-   * Called prior to selecting the {@link StoreFile}s for compaction from
-   * the list of currently available candidates.
+   * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
+   * available candidates.
    * @param store The store where compaction is being requested
    * @param candidates The currently available store files
+   * @param request custom compaction request
    * @return If {@code true}, skip the normal selection process and use the current list
    * @throws IOException
    */
-  public boolean preCompactSelection(Store store, List<StoreFile> candidates) throws IOException {
+  public boolean preCompactSelection(Store store, List<StoreFile> candidates,
+      CompactionRequest request) throws IOException {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     boolean bypass = false;
     for (RegionEnvironment env: coprocessors) {
       if (env.getInstance() instanceof RegionObserver) {
         ctx = ObserverContext.createAndPrepare(env, ctx);
         try {
-          ((RegionObserver)env.getInstance()).preCompactSelection(
-              ctx, store, candidates);
+          ((RegionObserver) env.getInstance()).preCompactSelection(ctx, store, candidates, request);
         } catch (Throwable e) {
           handleCoprocessorThrowable(env,e);
 
@@ -400,20 +402,20 @@ public class RegionCoprocessorHost
   }
 
   /**
-   * Called after the {@link StoreFile}s to be compacted have been selected
-   * from the available candidates.
+   * Called after the {@link StoreFile}s to be compacted have been selected from the available
+   * candidates.
    * @param store The store where compaction is being requested
    * @param selected The store files selected to compact
+   * @param request custom compaction
    */
-  public void postCompactSelection(Store store,
-      ImmutableList<StoreFile> selected) {
+  public void postCompactSelection(Store store, ImmutableList<StoreFile> selected,
+      CompactionRequest request) {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     for (RegionEnvironment env: coprocessors) {
       if (env.getInstance() instanceof RegionObserver) {
         ctx = ObserverContext.createAndPrepare(env, ctx);
         try {
-          ((RegionObserver)env.getInstance()).postCompactSelection(
-              ctx, store, selected);
+          ((RegionObserver) env.getInstance()).postCompactSelection(ctx, store, selected, request);
         } catch (Throwable e) {
           handleCoprocessorThrowableNoRethrow(env,e);
         }
@@ -429,18 +431,19 @@ public class RegionCoprocessorHost
    * @param store the store being compacted
    * @param scanner the scanner used to read store data during compaction
    * @param scanType type of Scan
+   * @param request the compaction that will be executed
    * @throws IOException
    */
-  public InternalScanner preCompact(Store store, InternalScanner scanner,
-      ScanType scanType) throws IOException {
+  public InternalScanner preCompact(Store store, InternalScanner scanner, ScanType scanType,
+      CompactionRequest request) throws IOException {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     boolean bypass = false;
     for (RegionEnvironment env: coprocessors) {
       if (env.getInstance() instanceof RegionObserver) {
         ctx = ObserverContext.createAndPrepare(env, ctx);
         try {
-          scanner = ((RegionObserver)env.getInstance()).preCompact(
-              ctx, store, scanner, scanType);
+          scanner = ((RegionObserver) env.getInstance()).preCompact(ctx, store, scanner, scanType,
+            request);
         } catch (Throwable e) {
           handleCoprocessorThrowable(env,e);
         }
@@ -457,15 +460,17 @@ public class RegionCoprocessorHost
    * Called after the store compaction has completed.
    * @param store the store being compacted
    * @param resultFile the new store file written during compaction
+   * @param request the compaction that is being executed
    * @throws IOException
    */
-  public void postCompact(Store store, StoreFile resultFile) throws IOException {
+  public void postCompact(Store store, StoreFile resultFile, CompactionRequest request)
+      throws IOException {
     ObserverContext<RegionCoprocessorEnvironment> ctx = null;
     for (RegionEnvironment env: coprocessors) {
       if (env.getInstance() instanceof RegionObserver) {
         ctx = ObserverContext.createAndPrepare(env, ctx);
         try {
-          ((RegionObserver)env.getInstance()).postCompact(ctx, store, resultFile);
+          ((RegionObserver) env.getInstance()).postCompact(ctx, store, resultFile, request);
         } catch (Throwable e) {
           handleCoprocessorThrowable(env, e);
         }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed Feb 20 22:40:55 2013
@@ -160,7 +160,8 @@ public interface Store extends HeapSize,
 
   public CompactionRequest requestCompaction() throws IOException;
 
-  public CompactionRequest requestCompaction(int priority) throws IOException;
+  public CompactionRequest requestCompaction(int priority, CompactionRequest request)
+      throws IOException;
 
   public void finishRequest(CompactionRequest cr);
 

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Wed Feb 20 22:40:55 2013
@@ -19,20 +19,22 @@
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionHandler;
 import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Store;
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.util.StringUtils;
@@ -46,35 +48,51 @@ import com.google.common.collect.Collect
 /**
  * This class holds all details necessary to run a compaction.
  */
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate({ "coprocessor" })
+@InterfaceStability.Evolving
 public class CompactionRequest implements Comparable<CompactionRequest>,
     Runnable {
     static final Log LOG = LogFactory.getLog(CompactionRequest.class);
     private final HRegion region;
     private final HStore store;
-    private final CompactSelection compactSelection;
-    private final long totalSize;
-    private final boolean isMajor;
+    private CompactSelection compactSelection;
+    private long totalSize;
+    private boolean isMajor;
     private int priority;
     private final Long timeInNanos;
     private HRegionServer server = null;
 
-    public CompactionRequest(HRegion region, HStore store,
-        CompactSelection files, boolean isMajor, int priority) {
-      Preconditions.checkNotNull(region);
-      Preconditions.checkNotNull(files);
+    public static CompactionRequest getRequestForTesting(Collection<StoreFile> selection,
+        boolean isMajor) {
+      return new CompactionRequest(null, null, new CompactSelection(new ArrayList<StoreFile>(
+        selection)), isMajor, 0, System.nanoTime());
+    }
+
+    /**
+     * Constructor for a custom compaction. Uses the setXXX methods to update the state of the
+     * compaction before being used.
+     */
+    public CompactionRequest(HRegion region, HStore store, int priority) {
+    this(region, store, null, false, priority, System
+        .nanoTime());
+    }
+
+    public CompactionRequest(HRegion r, HStore s, CompactSelection files, boolean isMajor, int p) {
+      // delegate to the internal constructor after checking basic preconditions
+      this(Preconditions.checkNotNull(r), s, Preconditions.checkNotNull(files), isMajor, p, System
+          .nanoTime());
+    }
 
+    private CompactionRequest(HRegion region, HStore store, CompactSelection files, boolean isMajor,
+        int priority, long startTime) {
       this.region = region;
       this.store = store;
-      this.compactSelection = files;
-      long sz = 0;
-      for (StoreFile sf : files.getFilesToCompact()) {
-        sz += sf.getReader().length();
-      }
-      this.totalSize = sz;
       this.isMajor = isMajor;
       this.priority = priority;
-      this.timeInNanos = System.nanoTime();
+      this.timeInNanos = startTime;
+      if (files != null) {
+        this.setSelection(files);
+      }
     }
 
     /**
@@ -162,6 +180,28 @@ public class CompactionRequest implement
       this.server = hrs;
     }
 
+    /**
+     * Set the files (and, implicitly, the size of the compaction based on those files)
+     * @param files files that should be included in the compaction
+     */
+    public void setSelection(CompactSelection files) {
+      long sz = 0;
+      for (StoreFile sf : files.getFilesToCompact()) {
+        sz += sf.getReader().length();
+      }
+      this.totalSize = sz;
+      this.compactSelection = files;
+    }
+
+    /**
+     * Specify if this compaction should be a major compaction based on the state of the store
+     * @param isMajor <tt>true</tt> if the system determines that this compaction should be a major
+     *          compaction
+     */
+    public void setIsMajor(boolean isMajor) {
+      this.isMajor = isMajor;
+    }
+
     @Override
     public String toString() {
       String fsList = Joiner.on(", ").join(
@@ -200,12 +240,11 @@ public class CompactionRequest implement
         if (completed) {
           // degenerate case: blocked regions require recursive enqueues
           if (store.getCompactPriority() <= 0) {
-            server.compactSplitThread
-              .requestCompaction(region, store, "Recursive enqueue");
-          } else {
-            // see if the compaction has caused us to exceed max region size
-            server.compactSplitThread.requestSplit(region);
-          }
+            server.compactSplitThread.requestCompaction(region, store, "Recursive enqueue", null);
+            } else {
+              // see if the compaction has caused us to exceed max region size
+          server.getCompactSplitThread().requestSplit(region);
+            }
         }
       } catch (IOException ex) {
         LOG.error("Compaction failed " + this, RemoteExceptionHandler
@@ -234,4 +273,4 @@ public class CompactionRequest implement
         }
       }
     }
-  }
+}

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java Wed Feb 20 22:40:55 2013
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.regionserver.compactions;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
@@ -41,15 +42,28 @@ public abstract class Compactor {
 
   /**
    * Do a minor/major compaction on an explicit set of storefiles from a Store.
-   *
    * @param filesToCompact which files to compact
-   * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
-   * @return Product of compaction or an empty list if all cells expired or deleted and
-   * nothing made it through the compaction.
+   * @param request the requested compaction
+   * @return Product of compaction or an empty list if all cells expired or deleted and nothing made
+   *         it through the compaction.
    * @throws IOException
    */
-  public abstract List<Path> compact(final Collection<StoreFile> filesToCompact,
-    final boolean majorCompaction) throws IOException;
+  public abstract List<Path> compact(final CompactionRequest request) throws IOException;
+
+  /**
+   * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
+   * {@link #compact(CompactionRequest)};
+   * @param filesToCompact the files to compact. These are used as the compactionSelection for the
+   *          generated {@link CompactionRequest}.
+   * @param isMajor true to major compact (prune all deletes, max versions, etc)
+   * @return Product of compaction or an empty list if all cells expired or deleted and nothing made
+   *         it through the compaction.
+   * @throws IOException
+   */
+  public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
+      throws IOException {
+    return compact(CompactionRequest.getRequestForTesting(filesToCompact, isMajor));
+  }
 
   public CompactionProgress getProgress() {
     return this.progress;

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java Wed Feb 20 22:40:55 2013
@@ -59,16 +59,12 @@ public class DefaultCompactor extends Co
 
   /**
    * Do a minor/major compaction on an explicit set of storefiles from a Store.
-   *
-   * @param filesToCompact which files to compact
-   * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
-   * @return Product of compaction or an empty list if all cells expired or deleted and
-   * nothing made it through the compaction.
-   * @throws IOException
    */
   @SuppressWarnings("deprecation")
-  public List<Path> compact(final Collection<StoreFile> filesToCompact,
-      final boolean majorCompaction) throws IOException {
+  @Override
+  public List<Path> compact(final CompactionRequest request) throws IOException {
+    final Collection<StoreFile> filesToCompact = request.getFiles();
+    boolean majorCompaction = request.isMajor();
     // Max-sequenceID is the last key in the files we're compacting
     long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
 
@@ -139,7 +135,8 @@ public class DefaultCompactor extends Co
           scanner = store
               .getCoprocessorHost()
               .preCompactScannerOpen(store, scanners,
-                  majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
+                majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs,
+                request);
         }
         ScanType scanType = majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT;
         if (scanner == null) {
@@ -150,11 +147,11 @@ public class DefaultCompactor extends Co
             scanType, smallestReadPoint, earliestPutTs);
         }
         if (store.getCoprocessorHost() != null) {
-          InternalScanner cpScanner =
-            store.getCoprocessorHost().preCompact(store, scanner, scanType);
+          InternalScanner cpScanner = store.getCoprocessorHost().preCompact(store, scanner,
+            scanType, request);
           // NULL scanner returned from coprocessor hooks means skip normal processing
           if (cpScanner == null) {
-            return newFiles;  // an empty list
+            return newFiles; // an empty list
           }
           scanner = cpScanner;
         }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Wed Feb 20 22:40:55 2013
@@ -28,6 +28,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,13 +48,17 @@ import org.apache.hadoop.hbase.client.Pu
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.regionserver.compactions.*;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -591,7 +596,7 @@ public class TestCompaction extends HBas
     Collection<StoreFile> storeFiles = store.getStorefiles();
     Compactor tool = store.compactor;
 
-    List<Path> newFiles = tool.compact(storeFiles, false);
+    List<Path> newFiles = tool.compactForTesting(storeFiles, false);
 
     // Now lets corrupt the compacted file.
     FileSystem fs = FileSystem.get(conf);
@@ -630,7 +635,7 @@ public class TestCompaction extends HBas
     }
     store.triggerMajorCompaction();
 
-    CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY);
+    CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null);
     assertNotNull("Expected to receive a compaction request", request);
     assertEquals(
       "System-requested major compaction should not occur if there are too many store files",
@@ -648,7 +653,7 @@ public class TestCompaction extends HBas
       createStoreFile(r);
     }
     store.triggerMajorCompaction();
-    CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER);
+    CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null);
     assertNotNull("Expected to receive a compaction request", request);
     assertEquals(
       "User-requested major compaction should always occur, even if there are too many store files",
@@ -656,5 +661,53 @@ public class TestCompaction extends HBas
       request.isMajor());
   }
 
-}
+  /**
+   * Create a custom compaction request and be sure that we can track it through the queue, knowing
+   * when the compaction is completed.
+   */
+  public void testTrackingCompactionRequest() throws Exception {
+    // setup a compact/split thread on a mock server
+    HRegionServer mockServer = Mockito.mock(HRegionServer.class);
+    Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
+    CompactSplitThread thread = new CompactSplitThread(mockServer);
+    Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
+
+    // setup a region/store with some files
+    Store store = r.getStore(COLUMN_FAMILY);
+    createStoreFile(r);
+    for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
+      createStoreFile(r);
+    }
+
+    CountDownLatch latch = new CountDownLatch(1);
+    TrackableCompactionRequest request = new TrackableCompactionRequest(r, (HStore) store, latch);
+    thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request);
+    // wait for the latch to complete.
+    latch.await();
 
+    thread.interruptIfNecessary();
+  }
+
+  /**
+   * Simple {@link CompactionRequest} on which you can wait until the requested compaction finishes.
+   */
+  public static class TrackableCompactionRequest extends CompactionRequest {
+    private CountDownLatch done;
+
+    /**
+     * Constructor for a custom compaction. Uses the setXXX methods to update the state of the
+     * compaction before being used.
+     */
+    public TrackableCompactionRequest(HRegion region, HStore store, CountDownLatch finished) {
+      super(region, store, Store.PRIORITY_USER);
+      this.done = finished;
+    }
+
+    @Override
+    public void run() {
+      super.run();
+      this.done.countDown();
+    }
+  }
+
+}