You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jy...@apache.org on 2013/02/20 23:40:55 UTC
svn commit: r1448449 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/coprocessor/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/compactions/
test/java/org/apache/hadoop/hbase/region...
Author: jyates
Date: Wed Feb 20 22:40:55 2013
New Revision: 1448449
URL: http://svn.apache.org/r1448449
Log:
HBASE-7725: Add ability to create custom compaction request
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java Wed Feb 20 22:40:55 2013
@@ -41,6 +41,8 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Pair;
@@ -135,28 +137,63 @@ public abstract class BaseRegionObserver
final Store store, final List<StoreFile> candidates) throws IOException { }
@Override
+ public void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final List<StoreFile> candidates, final CompactionRequest request)
+ throws IOException {
+ preCompactSelection(c, store, candidates);
+ }
+
+ @Override
public void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final ImmutableList<StoreFile> selected) { }
@Override
+ public void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final ImmutableList<StoreFile> selected, CompactionRequest request) {
+ postCompactSelection(c, store, selected);
+ }
+
+ @Override
public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
final Store store, final InternalScanner scanner, final ScanType scanType)
- throws IOException {
+ throws IOException {
return scanner;
}
@Override
- public InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
- final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
- final long earliestPutTs, final InternalScanner s) throws IOException {
+ public InternalScanner preCompact(ObserverContext<RegionCoprocessorEnvironment> e,
+ final Store store, final InternalScanner scanner, final ScanType scanType,
+ CompactionRequest request) throws IOException {
+ return preCompact(e, store, scanner, scanType);
+ }
+
+ @Override
+ public InternalScanner preCompactScannerOpen(
+ final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+ List<? extends KeyValueScanner> scanners, final ScanType scanType, final long earliestPutTs,
+ final InternalScanner s) throws IOException {
return null;
}
@Override
+ public InternalScanner preCompactScannerOpen(
+ final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+ List<? extends KeyValueScanner> scanners, final ScanType scanType, final long earliestPutTs,
+ final InternalScanner s, CompactionRequest request) throws IOException {
+ return preCompactScannerOpen(c, store, scanners, scanType, earliestPutTs, s);
+ }
+
+ @Override
public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
final StoreFile resultFile) throws IOException {
}
+@Override
+ public void postCompact(ObserverContext<RegionCoprocessorEnvironment> e, final Store store,
+ final StoreFile resultFile, CompactionRequest request) throws IOException {
+ postCompact(e, store, resultFile);
+ }
+
@Override
public void preGetClosestRowBefore(final ObserverContext<RegionCoprocessorEnvironment> e,
final byte [] row, final byte [] family, final Result result)
@@ -351,4 +388,4 @@ public abstract class BaseRegionObserver
List<Pair<byte[], String>> familyPaths, boolean hasLoaded) throws IOException {
return hasLoaded;
}
-}
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/RegionObserver.java Wed Feb 20 22:40:55 2013
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreFileScanner;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
@@ -126,87 +127,184 @@ public interface RegionObserver extends
final StoreFile resultFile) throws IOException;
/**
- * Called prior to selecting the {@link StoreFile}s to compact from the list
- * of available candidates. To alter the files used for compaction, you may
- * mutate the passed in list of candidates.
+ * Called prior to selecting the {@link StoreFile StoreFiles} to compact from the list of
+ * available candidates. To alter the files used for compaction, you may mutate the passed in list
+ * of candidates.
* @param c the environment provided by the region server
* @param store the store where compaction is being requested
* @param candidates the store files currently available for compaction
+ * @param request custom compaction request
* @throws IOException if an error occurred on the coprocessor
*/
void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final List<StoreFile> candidates, final CompactionRequest request)
+ throws IOException;
+
+ /**
+ * Called prior to selecting the {@link StoreFile}s to compact from the list of available
+ * candidates. To alter the files used for compaction, you may mutate the passed in list of
+ * candidates.
+ * @param c the environment provided by the region server
+ * @param store the store where compaction is being requested
+ * @param candidates the store files currently available for compaction
+ * @throws IOException if an error occurred on the coprocessor
+ * @deprecated Use {@link #preCompactSelection(ObserverContext, Store, List, Object)} instead
+ */
+ void preCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final List<StoreFile> candidates) throws IOException;
/**
- * Called after the {@link StoreFile}s to compact have been selected from the
- * available candidates.
+ * Called after the {@link StoreFile}s to compact have been selected from the available
+ * candidates.
+ * @param c the environment provided by the region server
+ * @param store the store being compacted
+ * @param selected the store files selected to compact
+ * @param request custom compaction request
+ */
+ void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final ImmutableList<StoreFile> selected, CompactionRequest request);
+
+ /**
+ * Called after the {@link StoreFile}s to compact have been selected from the available
+ * candidates.
* @param c the environment provided by the region server
* @param store the store being compacted
* @param selected the store files selected to compact
+ * @param compactionAttributes custom attributes for the compaction
+ * @deprecated use {@link #postCompactSelection(ObserverContext, Store, ImmutableList, Object)}
+ * instead.
*/
+ @Deprecated
void postCompactSelection(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, final ImmutableList<StoreFile> selected);
/**
- * Called prior to writing the {@link StoreFile}s selected for compaction into
- * a new {@code StoreFile}. To override or modify the compaction process,
- * implementing classes have two options:
+ * Called prior to writing the {@link StoreFile}s selected for compaction into a new
+ * {@code StoreFile}. To override or modify the compaction process, implementing classes have two
+ * options:
* <ul>
- * <li>Wrap the provided {@link InternalScanner} with a custom
- * implementation that is returned from this method. The custom scanner
- * can then inspect {@link KeyValue}s from the wrapped scanner, applying
- * its own policy to what gets written.</li>
- * <li>Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()}
- * and provide a custom implementation for writing of new
- * {@link StoreFile}s. <strong>Note: any implementations bypassing
- * core compaction using this approach must write out new store files
- * themselves or the existing data will no longer be available after
- * compaction.</strong></li>
+ * <li>Wrap the provided {@link InternalScanner} with a custom implementation that is returned
+ * from this method. The custom scanner can then inspect {@link KeyValue}s from the wrapped
+ * scanner, applying its own policy to what gets written.</li>
+ * <li>Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} and provide a
+ * custom implementation for writing of new {@link StoreFile}s. <strong>Note: any implementations
+ * bypassing core compaction using this approach must write out new store files themselves or the
+ * existing data will no longer be available after compaction.</strong></li>
* </ul>
* @param c the environment provided by the region server
* @param store the store being compacted
- * @param scanner the scanner over existing data used in the store file
- * rewriting
+ * @param scanner the scanner over existing data used in the store file rewriting
* @param scanType type of Scan
- * @return the scanner to use during compaction. Should not be {@code null}
- * unless the implementation is writing new store files on its own.
+ * @param request the requested compaction
+ * @return the scanner to use during compaction. Should not be {@code null} unless the
+ * implementation is writing new store files on its own.
* @throws IOException if an error occurred on the coprocessor
*/
InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
- final Store store, final InternalScanner scanner,
- final ScanType scanType) throws IOException;
+ final Store store, final InternalScanner scanner, final ScanType scanType,
+ CompactionRequest request) throws IOException;
/**
- * Called prior to writing the {@link StoreFile}s selected for compaction into
- * a new {@code StoreFile} and prior to creating the scanner used to read the
- * input files. To override or modify the compaction process,
- * implementing classes can return a new scanner to provide the KeyValues to be
- * stored into the new {@code StoreFile} or null to perform the default processing.
- * Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
+ * Called prior to writing the {@link StoreFile}s selected for compaction into a new
+ * {@code StoreFile}. To override or modify the compaction process, implementing classes have two
+ * options:
+ * <ul>
+ * <li>Wrap the provided {@link InternalScanner} with a custom implementation that is returned
+ * from this method. The custom scanner can then inspect {@link KeyValue}s from the wrapped
+ * scanner, applying its own policy to what gets written.</li>
+ * <li>Call {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} and provide a
+ * custom implementation for writing of new {@link StoreFile}s. <strong>Note: any implementations
+ * bypassing core compaction using this approach must write out new store files themselves or the
+ * existing data will no longer be available after compaction.</strong></li>
+ * </ul>
+ * @param c the environment provided by the region server
+ * @param store the store being compacted
+ * @param scanner the scanner over existing data used in the store file rewriting
+ * @param scanType type of Scan
+ * @param request the requested compaction
+ * @return the scanner to use during compaction. Should not be {@code null} unless the
+ * implementation is writing new store files on its own.
+ * @throws IOException if an error occurred on the coprocessor
+ * @deprecated use
+ * {@link #preCompact(ObserverContext, Store, InternalScanner, ScanType, CompactionRequest)}
+ * instead
+ */
+ @Deprecated
+ InternalScanner preCompact(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, final InternalScanner scanner, final ScanType scanType) throws IOException;
+
+ /**
+ * Called prior to writing the {@link StoreFile}s selected for compaction into a new
+ * {@code StoreFile} and prior to creating the scanner used to read the input files. To override
+ * or modify the compaction process, implementing classes can return a new scanner to provide the
+ * KeyValues to be stored into the new {@code StoreFile} or null to perform the default
+ * processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
* effect in this hook.
* @param c the environment provided by the region server
* @param store the store being compacted
* @param scanners the list {@link StoreFileScanner}s to be read from
* @param scanType the {@link ScanType} indicating whether this is a major or minor compaction
- * @param earliestPutTs timestamp of the earliest put that was found in any of the involved
- * store files
+ * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store
+ * files
* @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
- * @return the scanner to use during compaction. {@code null} if the default implementation
- * is to be used.
+ * @param request the requested compaction
+ * @return the scanner to use during compaction. {@code null} if the default implementation is to
+ * be used.
* @throws IOException if an error occurred on the coprocessor
*/
InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
+ final long earliestPutTs, final InternalScanner s, CompactionRequest request)
+ throws IOException;
+
+ /**
+ * Called prior to writing the {@link StoreFile}s selected for compaction into a new
+ * {@code StoreFile} and prior to creating the scanner used to read the input files. To override
+ * or modify the compaction process, implementing classes can return a new scanner to provide the
+ * KeyValues to be stored into the new {@code StoreFile} or null to perform the default
+ * processing. Calling {@link org.apache.hadoop.hbase.coprocessor.ObserverContext#bypass()} has no
+ * effect in this hook.
+ * @param c the environment provided by the region server
+ * @param store the store being compacted
+ * @param scanners the list {@link StoreFileScanner}s to be read from
+ * @param scanType the {@link ScanType} indicating whether this is a major or minor compaction
+ * @param earliestPutTs timestamp of the earliest put that was found in any of the involved store
+ * files
+ * @param s the base scanner, if not {@code null}, from previous RegionObserver in the chain
+ * @param request the requested compaction
+ * @return the scanner to use during compaction. {@code null} if the default implementation is to
+ * be used.
+ * @throws IOException if an error occurred on the coprocessor
+ * @deprecated Use
+ * {@link #preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
+ * instead.
+ */
+ @Deprecated
+ InternalScanner preCompactScannerOpen(final ObserverContext<RegionCoprocessorEnvironment> c,
+ final Store store, List<? extends KeyValueScanner> scanners, final ScanType scanType,
final long earliestPutTs, final InternalScanner s) throws IOException;
/**
- * Called after compaction has completed and the new store file has been
- * moved in to place.
+ * Called after compaction has completed and the new store file has been moved in to place.
+ * @param c the environment provided by the region server
+ * @param store the store being compacted
+ * @param resultFile the new store file written out during compaction
+ * @param request the requested compaction
+ * @throws IOException if an error occurred on the coprocessor
+ */
+ void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
+ StoreFile resultFile, CompactionRequest request) throws IOException;
+
+ /**
+ * Called after compaction has completed and the new store file has been moved in to place.
* @param c the environment provided by the region server
* @param store the store being compacted
* @param resultFile the new store file written out during compaction
* @throws IOException if an error occurred on the coprocessor
+ * @deprecated Use {@link #postCompact(ObserverContext, Store, StoreFile, CompactionRequest)}
+ * instead
*/
+ @Deprecated
void postCompact(final ObserverContext<RegionCoprocessorEnvironment> c, final Store store,
StoreFile resultFile) throws IOException;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Wed Feb 20 22:40:55 2013
@@ -19,7 +19,9 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
@@ -183,23 +185,41 @@ public class CompactSplitThread implemen
}
}
- public synchronized void requestCompaction(final HRegion r,
- final String why) throws IOException {
- for (Store s : r.getStores().values()) {
- requestCompaction(r, s, why, Store.NO_PRIORITY);
- }
+ @Override
+ public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why)
+ throws IOException {
+ return requestCompaction(r, why, null);
+ }
+
+ @Override
+ public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
+ List<CompactionRequest> requests) throws IOException {
+ return requestCompaction(r, why, Store.NO_PRIORITY, requests);
}
- public synchronized void requestCompaction(final HRegion r, final Store s,
- final String why) throws IOException {
- requestCompaction(r, s, why, Store.NO_PRIORITY);
+ @Override
+ public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
+ final String why, CompactionRequest request) throws IOException {
+ return requestCompaction(r, s, why, Store.NO_PRIORITY, request);
}
- public synchronized void requestCompaction(final HRegion r, final String why,
- int p) throws IOException {
- for (Store s : r.getStores().values()) {
- requestCompaction(r, s, why, p);
+ @Override
+ public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
+ int p, List<CompactionRequest> requests) throws IOException {
+ // not a special compaction request, so make our own list
+ List<CompactionRequest> ret;
+ if (requests == null) {
+ ret = new ArrayList<CompactionRequest>(r.getStores().size());
+ for (Store s : r.getStores().values()) {
+ ret.add(requestCompaction(r, s, why, p, null));
+ }
+ } else {
+ ret = new ArrayList<CompactionRequest>(requests.size());
+ for (CompactionRequest request : requests) {
+ requests.add(requestCompaction(r, request.getStore(), why, p, request));
+ }
}
+ return ret;
}
/**
@@ -207,13 +227,15 @@ public class CompactSplitThread implemen
* @param s Store to request compaction on
* @param why Why compaction requested -- used in debug messages
* @param priority override the default priority (NO_PRIORITY == decide)
+ * @param request custom compaction request. Can be <tt>null</tt> in which case a simple
+ * compaction will be used.
*/
- public synchronized void requestCompaction(final HRegion r, final Store s,
- final String why, int priority) throws IOException {
+ public synchronized CompactionRequest requestCompaction(final HRegion r, final Store s,
+ final String why, int priority, CompactionRequest request) throws IOException {
if (this.server.isStopped()) {
- return;
+ return null;
}
- CompactionRequest cr = s.requestCompaction(priority);
+ CompactionRequest cr = s.requestCompaction(priority, request);
if (cr != null) {
cr.setServer(server);
if (priority != Store.NO_PRIORITY) {
@@ -234,6 +256,7 @@ public class CompactSplitThread implemen
" because compaction request was cancelled");
}
}
+ return cr;
}
/**
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java Wed Feb 20 22:40:55 2013
@@ -19,42 +19,73 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.List;
+
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@InterfaceAudience.Private
public interface CompactionRequestor {
/**
* @param r Region to compact
* @param why Why compaction was requested -- used in debug messages
+ * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
+ * compactions were started
* @throws IOException
*/
- public void requestCompaction(final HRegion r, final String why) throws IOException;
+ public List<CompactionRequest> requestCompaction(final HRegion r, final String why)
+ throws IOException;
/**
* @param r Region to compact
- * @param s Store within region to compact
* @param why Why compaction was requested -- used in debug messages
+ * @param requests custom compaction requests. Each compaction must specify the store on which it
+ * is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all
+ * stores for the region.
+ * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
+ * compactions were started
* @throws IOException
*/
- public void requestCompaction(final HRegion r, final Store s, final String why)
+ public List<CompactionRequest> requestCompaction(final HRegion r, final String why,
+ List<CompactionRequest> requests)
throws IOException;
/**
* @param r Region to compact
+ * @param s Store within region to compact
+ * @param why Why compaction was requested -- used in debug messages
+ * @param request custom compaction request for the {@link HRegion} and {@link Store}. Custom
+ * request must be <tt>null</tt> or be constructed with matching region and store.
+ * @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started.
+ * @throws IOException
+ */
+ public CompactionRequest requestCompaction(final HRegion r, final Store s, final String why,
+ CompactionRequest request) throws IOException;
+
+ /**
+ * @param r Region to compact
* @param why Why compaction was requested -- used in debug messages
* @param pri Priority of this compaction. minHeap. <=0 is critical
+ * @param requests custom compaction requests. Each compaction must specify the store on which it
+ * is acting. Can be <tt>null</tt> in which case a compaction will be attempted on all
+ * stores for the region.
+ * @return The created {@link CompactionRequest CompactionRequests} or an empty list if no
+ * compactions were started.
* @throws IOException
*/
- public void requestCompaction(final HRegion r, final String why, int pri) throws IOException;
+ public List<CompactionRequest> requestCompaction(final HRegion r, final String why, int pri,
+ List<CompactionRequest> requests) throws IOException;
/**
* @param r Region to compact
* @param s Store within region to compact
* @param why Why compaction was requested -- used in debug messages
* @param pri Priority of this compaction. minHeap. <=0 is critical
+ * @param request custom compaction request to run. {@link Store} and {@link HRegion} for the
+ * request must match the region and store specified here.
+ * @return The created {@link CompactionRequest} or <tt>null</tt> if no compaction was started
* @throws IOException
*/
- public void requestCompaction(final HRegion r, final Store s,
- final String why, int pri) throws IOException;
-
+ public CompactionRequest requestCompaction(final HRegion r, final Store s, final String why,
+ int pri, CompactionRequest request) throws IOException;
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Wed Feb 20 22:40:55 2013
@@ -1348,17 +1348,17 @@ public class HRegionServer implements Cl
try {
if (s.needsCompaction()) {
// Queue a compaction. Will recognize if major is needed.
- this.instance.compactSplitThread.requestCompaction(r, s,
- getName() + " requests compaction");
+ this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ + " requests compaction", null);
} else if (s.isMajorCompaction()) {
- if (majorCompactPriority == DEFAULT_PRIORITY ||
- majorCompactPriority > r.getCompactPriority()) {
- this.instance.compactSplitThread.requestCompaction(r, s,
- getName() + " requests major compaction; use default priority");
+ if (majorCompactPriority == DEFAULT_PRIORITY
+ || majorCompactPriority > r.getCompactPriority()) {
+ this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ + " requests major compaction; use default priority", null);
} else {
- this.instance.compactSplitThread.requestCompaction(r, s,
- getName() + " requests major compaction; use configured priority",
- this.majorCompactPriority);
+ this.instance.compactSplitThread.requestCompaction(r, s, getName()
+ + " requests major compaction; use configured priority",
+ this.majorCompactPriority, null);
}
}
} catch (IOException e) {
@@ -1665,7 +1665,7 @@ public class HRegionServer implements Cl
// Do checks to see if we need to compact (references or too many files)
for (Store s : r.getStores().values()) {
if (s.hasReferences() || s.needsCompaction()) {
- getCompactionRequester().requestCompaction(r, s, "Opening Region");
+ getCompactionRequester().requestCompaction(r, s, "Opening Region", null);
}
}
long openSeqNum = r.getOpenSeqNum();
@@ -3630,10 +3630,10 @@ public class HRegionServer implements Cl
String log = "User-triggered " + (major ? "major " : "") + "compaction" + familyLogMsg;
if(family != null) {
compactSplitThread.requestCompaction(region, store, log,
- Store.PRIORITY_USER);
+ Store.PRIORITY_USER, null);
} else {
compactSplitThread.requestCompaction(region, log,
- Store.PRIORITY_USER);
+ Store.PRIORITY_USER, null);
}
return CompactRegionResponse.newBuilder().build();
} catch (IOException ie) {
@@ -4034,4 +4034,11 @@ public class HRegionServer implements Cl
String healthScriptLocation = this.conf.get(HConstants.HEALTH_SCRIPT_LOC);
return org.apache.commons.lang.StringUtils.isNotBlank(healthScriptLocation);
}
+
+ /**
+ * @return the underlying {@link CompactSplitThread} for the servers
+ */
+ public CompactSplitThread getCompactSplitThread() {
+ return this.compactSplitThread;
+ }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Wed Feb 20 22:40:55 2013
@@ -1090,14 +1090,13 @@ public class HStore implements Store {
List<StoreFile> sfs = new ArrayList<StoreFile>();
long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
try {
- List<Path> newFiles =
- this.compactor.compact(filesToCompact, cr.isMajor());
+ List<Path> newFiles = this.compactor.compact(cr);
// Move the compaction into place.
if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
for (Path newFile: newFiles) {
StoreFile sf = completeCompaction(filesToCompact, newFile);
if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postCompact(this, sf);
+ region.getCoprocessorHost().postCompact(this, sf, cr);
}
sfs.add(sf);
}
@@ -1181,13 +1180,12 @@ public class HStore implements Store {
try {
// Ready to go. Have list of files to compact.
- List<Path> newFiles =
- this.compactor.compact(filesToCompact, isMajor);
+ List<Path> newFiles = this.compactor.compactForTesting(filesToCompact, isMajor);
for (Path newFile: newFiles) {
// Move the compaction into place.
StoreFile sf = completeCompaction(filesToCompact, newFile);
if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postCompact(this, sf);
+ region.getCoprocessorHost().postCompact(this, sf, null);
}
}
} finally {
@@ -1219,17 +1217,19 @@ public class HStore implements Store {
return compactionPolicy.isMajorCompaction(this.storeFileManager.getStorefiles());
}
+ @Override
public CompactionRequest requestCompaction() throws IOException {
- return requestCompaction(Store.NO_PRIORITY);
+ return requestCompaction(Store.NO_PRIORITY, null);
}
- public CompactionRequest requestCompaction(int priority) throws IOException {
+ @Override
+ public CompactionRequest requestCompaction(int priority, CompactionRequest request)
+ throws IOException {
// don't even select for compaction if writes are disabled
if (!this.region.areWritesEnabled()) {
return null;
}
- CompactionRequest ret = null;
this.lock.readLock().lock();
try {
List<StoreFile> candidates = Lists.newArrayList(storeFileManager.getStorefiles());
@@ -1238,7 +1238,7 @@ public class HStore implements Store {
candidates = compactionPolicy.preSelectCompaction(candidates, filesCompacting);
boolean override = false;
if (region.getCoprocessorHost() != null) {
- override = region.getCoprocessorHost().preCompactSelection(this, candidates);
+ override = region.getCoprocessorHost().preCompactSelection(this, candidates, request);
}
CompactSelection filesToCompact;
if (override) {
@@ -1257,7 +1257,7 @@ public class HStore implements Store {
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompactSelection(this,
- ImmutableList.copyOf(filesToCompact.getFilesToCompact()));
+ ImmutableList.copyOf(filesToCompact.getFilesToCompact()), request);
}
// no files to compact
@@ -1287,15 +1287,24 @@ public class HStore implements Store {
// everything went better than expected. create a compaction request
int pri = getCompactPriority(priority);
- ret = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
+ //not a special compaction request, so we need to make one
+ if(request == null){
+ request = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
+ }else{
+ //update the request with what the system thinks the request should be
+ //its up to the request if it wants to listen
+ request.setSelection(filesToCompact);
+ request.setIsMajor(isMajor);
+ request.setPriority(pri);
+ }
}
} finally {
this.lock.readLock().unlock();
}
- if (ret != null) {
- this.region.reportCompactionRequestStart(ret.isMajor());
+ if (request != null) {
+ this.region.reportCompactionRequestStart(request.isMajor());
}
- return ret;
+ return request;
}
public void finishRequest(CompactionRequest cr) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Wed Feb 20 22:40:55 2013
@@ -55,6 +55,8 @@ import org.apache.hadoop.hbase.coprocess
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
@@ -345,11 +347,10 @@ public class RegionCoprocessorHost
/**
* See
- * {@link RegionObserver#preCompactScannerOpen(ObserverContext,
- * Store, List, ScanType, long, InternalScanner)}
+ * {@link RegionObserver#preCompactScannerOpen(ObserverContext, Store, List, ScanType, long, InternalScanner, CompactionRequest)}
*/
public InternalScanner preCompactScannerOpen(Store store, List<StoreFileScanner> scanners,
- ScanType scanType, long earliestPutTs) throws IOException {
+ ScanType scanType, long earliestPutTs, CompactionRequest request) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
InternalScanner s = null;
for (RegionEnvironment env: coprocessors) {
@@ -357,7 +358,7 @@ public class RegionCoprocessorHost
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
s = ((RegionObserver) env.getInstance()).preCompactScannerOpen(ctx, store, scanners,
- scanType, earliestPutTs, s);
+ scanType, earliestPutTs, s, request);
} catch (Throwable e) {
handleCoprocessorThrowable(env,e);
}
@@ -370,22 +371,23 @@ public class RegionCoprocessorHost
}
/**
- * Called prior to selecting the {@link StoreFile}s for compaction from
- * the list of currently available candidates.
+ * Called prior to selecting the {@link StoreFile}s for compaction from the list of currently
+ * available candidates.
* @param store The store where compaction is being requested
* @param candidates The currently available store files
+ * @param request custom compaction request
* @return If {@code true}, skip the normal selection process and use the current list
* @throws IOException
*/
- public boolean preCompactSelection(Store store, List<StoreFile> candidates) throws IOException {
+ public boolean preCompactSelection(Store store, List<StoreFile> candidates,
+ CompactionRequest request) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
boolean bypass = false;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
- ((RegionObserver)env.getInstance()).preCompactSelection(
- ctx, store, candidates);
+ ((RegionObserver) env.getInstance()).preCompactSelection(ctx, store, candidates, request);
} catch (Throwable e) {
handleCoprocessorThrowable(env,e);
@@ -400,20 +402,20 @@ public class RegionCoprocessorHost
}
/**
- * Called after the {@link StoreFile}s to be compacted have been selected
- * from the available candidates.
+ * Called after the {@link StoreFile}s to be compacted have been selected from the available
+ * candidates.
* @param store The store where compaction is being requested
* @param selected The store files selected to compact
+ * @param request custom compaction
*/
- public void postCompactSelection(Store store,
- ImmutableList<StoreFile> selected) {
+ public void postCompactSelection(Store store, ImmutableList<StoreFile> selected,
+ CompactionRequest request) {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
- ((RegionObserver)env.getInstance()).postCompactSelection(
- ctx, store, selected);
+ ((RegionObserver) env.getInstance()).postCompactSelection(ctx, store, selected, request);
} catch (Throwable e) {
handleCoprocessorThrowableNoRethrow(env,e);
}
@@ -429,18 +431,19 @@ public class RegionCoprocessorHost
* @param store the store being compacted
* @param scanner the scanner used to read store data during compaction
* @param scanType type of Scan
+ * @param request the compaction that will be executed
* @throws IOException
*/
- public InternalScanner preCompact(Store store, InternalScanner scanner,
- ScanType scanType) throws IOException {
+ public InternalScanner preCompact(Store store, InternalScanner scanner, ScanType scanType,
+ CompactionRequest request) throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
boolean bypass = false;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
- scanner = ((RegionObserver)env.getInstance()).preCompact(
- ctx, store, scanner, scanType);
+ scanner = ((RegionObserver) env.getInstance()).preCompact(ctx, store, scanner, scanType,
+ request);
} catch (Throwable e) {
handleCoprocessorThrowable(env,e);
}
@@ -457,15 +460,17 @@ public class RegionCoprocessorHost
* Called after the store compaction has completed.
* @param store the store being compacted
* @param resultFile the new store file written during compaction
+ * @param request the compaction that is being executed
* @throws IOException
*/
- public void postCompact(Store store, StoreFile resultFile) throws IOException {
+ public void postCompact(Store store, StoreFile resultFile, CompactionRequest request)
+ throws IOException {
ObserverContext<RegionCoprocessorEnvironment> ctx = null;
for (RegionEnvironment env: coprocessors) {
if (env.getInstance() instanceof RegionObserver) {
ctx = ObserverContext.createAndPrepare(env, ctx);
try {
- ((RegionObserver)env.getInstance()).postCompact(ctx, store, resultFile);
+ ((RegionObserver) env.getInstance()).postCompact(ctx, store, resultFile, request);
} catch (Throwable e) {
handleCoprocessorThrowable(env, e);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed Feb 20 22:40:55 2013
@@ -160,7 +160,8 @@ public interface Store extends HeapSize,
public CompactionRequest requestCompaction() throws IOException;
- public CompactionRequest requestCompaction(int priority) throws IOException;
+ public CompactionRequest requestCompaction(int priority, CompactionRequest request)
+ throws IOException;
public void finishRequest(CompactionRequest cr);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Wed Feb 20 22:40:55 2013
@@ -19,20 +19,22 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.GetRegionInfoResponse.CompactionState;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.HStore;
+import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.util.StringUtils;
@@ -46,35 +48,51 @@ import com.google.common.collect.Collect
/**
* This class holds all details necessary to run a compaction.
*/
-@InterfaceAudience.Private
+@InterfaceAudience.LimitedPrivate({ "coprocessor" })
+@InterfaceStability.Evolving
public class CompactionRequest implements Comparable<CompactionRequest>,
Runnable {
static final Log LOG = LogFactory.getLog(CompactionRequest.class);
private final HRegion region;
private final HStore store;
- private final CompactSelection compactSelection;
- private final long totalSize;
- private final boolean isMajor;
+ private CompactSelection compactSelection;
+ private long totalSize;
+ private boolean isMajor;
private int priority;
private final Long timeInNanos;
private HRegionServer server = null;
- public CompactionRequest(HRegion region, HStore store,
- CompactSelection files, boolean isMajor, int priority) {
- Preconditions.checkNotNull(region);
- Preconditions.checkNotNull(files);
+ public static CompactionRequest getRequestForTesting(Collection<StoreFile> selection,
+ boolean isMajor) {
+ return new CompactionRequest(null, null, new CompactSelection(new ArrayList<StoreFile>(
+ selection)), isMajor, 0, System.nanoTime());
+ }
+
+ /**
+ * Constructor for a custom compaction. Uses the setXXX methods to update the state of the
+ * compaction before being used.
+ */
+ public CompactionRequest(HRegion region, HStore store, int priority) {
+ this(region, store, null, false, priority, System
+ .nanoTime());
+ }
+
+ public CompactionRequest(HRegion r, HStore s, CompactSelection files, boolean isMajor, int p) {
+ // delegate to the internal constructor after checking basic preconditions
+ this(Preconditions.checkNotNull(r), s, Preconditions.checkNotNull(files), isMajor, p, System
+ .nanoTime());
+ }
+ private CompactionRequest(HRegion region, HStore store, CompactSelection files, boolean isMajor,
+ int priority, long startTime) {
this.region = region;
this.store = store;
- this.compactSelection = files;
- long sz = 0;
- for (StoreFile sf : files.getFilesToCompact()) {
- sz += sf.getReader().length();
- }
- this.totalSize = sz;
this.isMajor = isMajor;
this.priority = priority;
- this.timeInNanos = System.nanoTime();
+ this.timeInNanos = startTime;
+ if (files != null) {
+ this.setSelection(files);
+ }
}
/**
@@ -162,6 +180,28 @@ public class CompactionRequest implement
this.server = hrs;
}
+ /**
+ * Set the files (and, implicitly, the size of the compaction based on those files)
+ * @param files files that should be included in the compaction
+ */
+ public void setSelection(CompactSelection files) {
+ long sz = 0;
+ for (StoreFile sf : files.getFilesToCompact()) {
+ sz += sf.getReader().length();
+ }
+ this.totalSize = sz;
+ this.compactSelection = files;
+ }
+
+ /**
+ * Specify if this compaction should be a major compaction based on the state of the store
+ * @param isMajor <tt>true</tt> if the system determines that this compaction should be a major
+ * compaction
+ */
+ public void setIsMajor(boolean isMajor) {
+ this.isMajor = isMajor;
+ }
+
@Override
public String toString() {
String fsList = Joiner.on(", ").join(
@@ -200,12 +240,11 @@ public class CompactionRequest implement
if (completed) {
// degenerate case: blocked regions require recursive enqueues
if (store.getCompactPriority() <= 0) {
- server.compactSplitThread
- .requestCompaction(region, store, "Recursive enqueue");
- } else {
- // see if the compaction has caused us to exceed max region size
- server.compactSplitThread.requestSplit(region);
- }
+ server.compactSplitThread.requestCompaction(region, store, "Recursive enqueue", null);
+ } else {
+ // see if the compaction has caused us to exceed max region size
+ server.getCompactSplitThread().requestSplit(region);
+ }
}
} catch (IOException ex) {
LOG.error("Compaction failed " + this, RemoteExceptionHandler
@@ -234,4 +273,4 @@ public class CompactionRequest implement
}
}
}
- }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java Wed Feb 20 22:40:55 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.regionserver.compactions;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
@@ -41,15 +42,28 @@ public abstract class Compactor {
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
- *
* @param filesToCompact which files to compact
- * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
- * @return Product of compaction or an empty list if all cells expired or deleted and
- * nothing made it through the compaction.
+ * @param request the requested compaction
+ * @return Product of compaction or an empty list if all cells expired or deleted and nothing made
+ * it through the compaction.
* @throws IOException
*/
- public abstract List<Path> compact(final Collection<StoreFile> filesToCompact,
- final boolean majorCompaction) throws IOException;
+ public abstract List<Path> compact(final CompactionRequest request) throws IOException;
+
+ /**
+ * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
+ * {@link #compact(CompactionRequest)};
+ * @param filesToCompact the files to compact. These are used as the compactionSelection for the
+ * generated {@link CompactionRequest}.
+ * @param isMajor true to major compact (prune all deletes, max versions, etc)
+ * @return Product of compaction or an empty list if all cells expired or deleted and nothing made
+ * it through the compaction.
+ * @throws IOException
+ */
+ public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
+ throws IOException {
+ return compact(CompactionRequest.getRequestForTesting(filesToCompact, isMajor));
+ }
public CompactionProgress getProgress() {
return this.progress;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java Wed Feb 20 22:40:55 2013
@@ -59,16 +59,12 @@ public class DefaultCompactor extends Co
/**
* Do a minor/major compaction on an explicit set of storefiles from a Store.
- *
- * @param filesToCompact which files to compact
- * @param majorCompaction true to major compact (prune all deletes, max versions, etc)
- * @return Product of compaction or an empty list if all cells expired or deleted and
- * nothing made it through the compaction.
- * @throws IOException
*/
@SuppressWarnings("deprecation")
- public List<Path> compact(final Collection<StoreFile> filesToCompact,
- final boolean majorCompaction) throws IOException {
+ @Override
+ public List<Path> compact(final CompactionRequest request) throws IOException {
+ final Collection<StoreFile> filesToCompact = request.getFiles();
+ boolean majorCompaction = request.isMajor();
// Max-sequenceID is the last key in the files we're compacting
long maxId = StoreFile.getMaxSequenceIdInList(filesToCompact, true);
@@ -139,7 +135,8 @@ public class DefaultCompactor extends Co
scanner = store
.getCoprocessorHost()
.preCompactScannerOpen(store, scanners,
- majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs);
+ majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT, earliestPutTs,
+ request);
}
ScanType scanType = majorCompaction? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT;
if (scanner == null) {
@@ -150,11 +147,11 @@ public class DefaultCompactor extends Co
scanType, smallestReadPoint, earliestPutTs);
}
if (store.getCoprocessorHost() != null) {
- InternalScanner cpScanner =
- store.getCoprocessorHost().preCompact(store, scanner, scanType);
+ InternalScanner cpScanner = store.getCoprocessorHost().preCompact(store, scanner,
+ scanType, request);
// NULL scanner returned from coprocessor hooks means skip normal processing
if (cpScanner == null) {
- return newFiles; // an empty list
+ return newFiles; // an empty list
}
scanner = cpScanner;
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1448449&r1=1448448&r2=1448449&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Wed Feb 20 22:40:55 2013
@@ -28,6 +28,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.concurrent.CountDownLatch;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -47,13 +48,17 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
-import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoderImpl;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
-import org.apache.hadoop.hbase.regionserver.compactions.*;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -591,7 +596,7 @@ public class TestCompaction extends HBas
Collection<StoreFile> storeFiles = store.getStorefiles();
Compactor tool = store.compactor;
- List<Path> newFiles = tool.compact(storeFiles, false);
+ List<Path> newFiles = tool.compactForTesting(storeFiles, false);
// Now lets corrupt the compacted file.
FileSystem fs = FileSystem.get(conf);
@@ -630,7 +635,7 @@ public class TestCompaction extends HBas
}
store.triggerMajorCompaction();
- CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY);
+ CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null);
assertNotNull("Expected to receive a compaction request", request);
assertEquals(
"System-requested major compaction should not occur if there are too many store files",
@@ -648,7 +653,7 @@ public class TestCompaction extends HBas
createStoreFile(r);
}
store.triggerMajorCompaction();
- CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER);
+ CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null);
assertNotNull("Expected to receive a compaction request", request);
assertEquals(
"User-requested major compaction should always occur, even if there are too many store files",
@@ -656,5 +661,53 @@ public class TestCompaction extends HBas
request.isMajor());
}
-}
+ /**
+ * Create a custom compaction request and be sure that we can track it through the queue, knowing
+ * when the compaction is completed.
+ */
+ public void testTrackingCompactionRequest() throws Exception {
+ // setup a compact/split thread on a mock server
+ HRegionServer mockServer = Mockito.mock(HRegionServer.class);
+ Mockito.when(mockServer.getConfiguration()).thenReturn(r.getBaseConf());
+ CompactSplitThread thread = new CompactSplitThread(mockServer);
+ Mockito.when(mockServer.getCompactSplitThread()).thenReturn(thread);
+
+ // setup a region/store with some files
+ Store store = r.getStore(COLUMN_FAMILY);
+ createStoreFile(r);
+ for (int i = 0; i < MAX_FILES_TO_COMPACT + 1; i++) {
+ createStoreFile(r);
+ }
+
+ CountDownLatch latch = new CountDownLatch(1);
+ TrackableCompactionRequest request = new TrackableCompactionRequest(r, (HStore) store, latch);
+ thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request);
+ // wait for the latch to complete.
+ latch.await();
+ thread.interruptIfNecessary();
+ }
+
+ /**
+ * Simple {@link CompactionRequest} on which you can wait until the requested compaction finishes.
+ */
+ public static class TrackableCompactionRequest extends CompactionRequest {
+ private CountDownLatch done;
+
+ /**
+ * Constructor for a custom compaction. Uses the setXXX methods to update the state of the
+ * compaction before being used.
+ */
+ public TrackableCompactionRequest(HRegion region, HStore store, CountDownLatch finished) {
+ super(region, store, Store.PRIORITY_USER);
+ this.done = finished;
+ }
+
+ @Override
+ public void run() {
+ super.run();
+ this.done.countDown();
+ }
+ }
+
+}