You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/02/26 21:52:00 UTC
svn commit: r1450407 [1/2] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/coprocessor/
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/compactions/
test/java/org/apache/hadoop/hbase/...
Author: sershe
Date: Tue Feb 26 20:51:59 2013
New Revision: 1450407
URL: http://svn.apache.org/r1450407
Log:
HBASE-7843 Enable encapsulating compaction policy/compactor/store file manager interaction shenanigans
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java
Removed:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactSelection.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestDefaultCompactSelection.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/PerfTestCompactionPolicies.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/coprocessor/BaseRegionObserver.java Tue Feb 26 20:51:59 2013
@@ -41,7 +41,6 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.ScanType;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java Tue Feb 26 20:51:59 2013
@@ -26,6 +26,7 @@ import java.util.concurrent.BlockingQueu
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -34,7 +35,12 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.RemoteExceptionHandler;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
@@ -89,8 +95,7 @@ public class CompactSplitThread implemen
return t;
}
});
- this.largeCompactions
- .setRejectedExecutionHandler(new CompactionRequest.Rejection());
+ this.largeCompactions.setRejectedExecutionHandler(new Rejection());
this.smallCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
60, TimeUnit.SECONDS, new PriorityBlockingQueue<Runnable>(),
new ThreadFactory() {
@@ -102,7 +107,7 @@ public class CompactSplitThread implemen
}
});
this.smallCompactions
- .setRejectedExecutionHandler(new CompactionRequest.Rejection());
+ .setRejectedExecutionHandler(new Rejection());
this.splits = (ThreadPoolExecutor)
Executors.newFixedThreadPool(splitThreads,
new ThreadFactory() {
@@ -193,7 +198,7 @@ public class CompactSplitThread implemen
@Override
public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
- List<CompactionRequest> requests) throws IOException {
+ List<Pair<CompactionRequest, Store>> requests) throws IOException {
return requestCompaction(r, why, Store.NO_PRIORITY, requests);
}
@@ -205,7 +210,7 @@ public class CompactSplitThread implemen
@Override
public synchronized List<CompactionRequest> requestCompaction(final HRegion r, final String why,
- int p, List<CompactionRequest> requests) throws IOException {
+ int p, List<Pair<CompactionRequest, Store>> requests) throws IOException {
// not a special compaction request, so make our own list
List<CompactionRequest> ret;
if (requests == null) {
@@ -215,8 +220,8 @@ public class CompactSplitThread implemen
}
} else {
ret = new ArrayList<CompactionRequest>(requests.size());
- for (CompactionRequest request : requests) {
- requests.add(requestCompaction(r, request.getStore(), why, p, request));
+ for (Pair<CompactionRequest, Store> pair : requests) {
+ ret.add(requestCompaction(r, pair.getSecond(), why, p, pair.getFirst()));
}
}
return ret;
@@ -235,28 +240,29 @@ public class CompactSplitThread implemen
if (this.server.isStopped()) {
return null;
}
- CompactionRequest cr = s.requestCompaction(priority, request);
- if (cr != null) {
- cr.setServer(server);
- if (priority != Store.NO_PRIORITY) {
- cr.setPriority(priority);
- }
- ThreadPoolExecutor pool = s.throttleCompaction(cr.getSize())
- ? largeCompactions : smallCompactions;
- pool.execute(cr);
- if (LOG.isDebugEnabled()) {
- String type = (pool == smallCompactions) ? "Small " : "Large ";
- LOG.debug(type + "Compaction requested: " + cr
- + (why != null && !why.isEmpty() ? "; Because: " + why : "")
- + "; " + this);
- }
- } else {
+ CompactionContext compaction = s.requestCompaction(priority, request);
+ if (compaction == null) {
if(LOG.isDebugEnabled()) {
LOG.debug("Not compacting " + r.getRegionNameAsString() +
" because compaction request was cancelled");
}
+ return null;
+ }
+
+ assert compaction.hasSelection();
+ if (priority != Store.NO_PRIORITY) {
+ compaction.getRequest().setPriority(priority);
}
- return cr;
+ ThreadPoolExecutor pool = s.throttleCompaction(compaction.getRequest().getSize())
+ ? largeCompactions : smallCompactions;
+ pool.execute(new CompactionRunner(s, r, compaction));
+ if (LOG.isDebugEnabled()) {
+ String type = (pool == smallCompactions) ? "Small " : "Large ";
+ LOG.debug(type + "Compaction requested: " + compaction
+ + (why != null && !why.isEmpty() ? "; Because: " + why : "")
+ + "; " + this);
+ }
+ return compaction.getRequest();
}
/**
@@ -309,4 +315,73 @@ public class CompactSplitThread implemen
public int getRegionSplitLimit() {
return this.regionSplitLimit;
}
+
+ private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
+ private final Store store;
+ private final HRegion region;
+ private final CompactionContext compaction;
+
+ public CompactionRunner(Store store, HRegion region, CompactionContext compaction) {
+ super();
+ this.store = store;
+ this.region = region;
+ this.compaction = compaction;
+ }
+
+ @Override
+ public void run() {
+ Preconditions.checkNotNull(server);
+ if (server.isStopped()) {
+ return;
+ }
+ this.compaction.getRequest().beforeExecute();
+ try {
+ // Note: please don't put single-compaction logic here;
+ // put it into region/store/etc. This is CST logic.
+ long start = EnvironmentEdgeManager.currentTimeMillis();
+ boolean completed = region.compact(compaction, store);
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ LOG.info(((completed) ? "Completed" : "Aborted") + " compaction: " +
+ this + "; duration=" + StringUtils.formatTimeDiff(now, start));
+ if (completed) {
+ // degenerate case: blocked regions require recursive enqueues
+ if (store.getCompactPriority() <= 0) {
+ requestCompaction(region, store, "Recursive enqueue", null);
+ } else {
+ // see if the compaction has caused us to exceed max region size
+ requestSplit(region);
+ }
+ }
+ } catch (IOException ex) {
+ LOG.error("Compaction failed " + this, RemoteExceptionHandler.checkIOException(ex));
+ server.checkFileSystem();
+ } catch (Exception ex) {
+ LOG.error("Compaction failed " + this, ex);
+ server.checkFileSystem();
+ } finally {
+ LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
+ }
+ this.compaction.getRequest().afterExecute();
+ }
+
+ @Override
+ public int compareTo(CompactionRunner o) {
+ // Only compare the underlying request, for queue sorting purposes.
+ return this.compaction.getRequest().compareTo(o.compaction.getRequest());
+ }
+ }
+
+ /**
+ * Cleanup class to use when rejecting a compaction request from the queue.
+ */
+ private static class Rejection implements RejectedExecutionHandler {
+ @Override
+ public void rejectedExecution(Runnable runnable, ThreadPoolExecutor pool) {
+ if (runnable instanceof CompactionRunner) {
+ CompactionRunner runner = (CompactionRunner)runnable;
+ LOG.debug("Compaction Rejected: " + runner);
+ runner.store.cancelRequestedCompaction(runner.compaction);
+ }
+ }
+ }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionRequestor.java Tue Feb 26 20:51:59 2013
@@ -23,6 +23,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.util.Pair;
@InterfaceAudience.Private
public interface CompactionRequestor {
@@ -47,7 +48,7 @@ public interface CompactionRequestor {
* @throws IOException
*/
public List<CompactionRequest> requestCompaction(final HRegion r, final String why,
- List<CompactionRequest> requests)
+ List<Pair<CompactionRequest, Store>> requests)
throws IOException;
/**
@@ -74,7 +75,7 @@ public interface CompactionRequestor {
* @throws IOException
*/
public List<CompactionRequest> requestCompaction(final HRegion r, final String why, int pri,
- List<CompactionRequest> requests) throws IOException;
+ List<Pair<CompactionRequest, Store>> requests) throws IOException;
/**
* @param r Region to compact
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactionTool.java Tue Feb 26 20:51:59 2013
@@ -56,7 +56,7 @@ import org.apache.hadoop.hbase.HDFSBlock
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.mapreduce.JobUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.util.Bytes;
@@ -156,8 +156,9 @@ public class CompactionTool extends Conf
" family=" + familyDir.getName());
HStore store = getStore(region, familyDir);
do {
- CompactionRequest cr = store.requestCompaction();
- List<StoreFile> storeFiles = store.compact(cr);
+ CompactionContext compaction = store.requestCompaction();
+ if (compaction == null) break;
+ List<StoreFile> storeFiles = store.compact(compaction);
if (storeFiles != null && !storeFiles.isEmpty()) {
if (keepCompactedFiles && deleteCompacted) {
for (StoreFile storeFile: storeFiles) {
@@ -465,4 +466,4 @@ public class CompactionTool extends Conf
public static void main(String[] args) throws Exception {
System.exit(ToolRunner.run(HBaseConfiguration.create(), new CompactionTool(), args));
}
-}
\ No newline at end of file
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java Tue Feb 26 20:51:59 2013
@@ -18,28 +18,64 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
/**
- * Default StoreEngine creates the default compactor, policy, and store file manager.
+ * Default StoreEngine creates the default compactor, policy, and store file manager, or
+ * their derivatives.
*/
@InterfaceAudience.Private
-public class DefaultStoreEngine extends StoreEngine {
+public class DefaultStoreEngine extends StoreEngine<
+ DefaultCompactionPolicy, DefaultCompactor, DefaultStoreFileManager> {
+
public DefaultStoreEngine(Configuration conf, Store store, KVComparator comparator) {
super(conf, store, comparator);
}
@Override
- protected void createComponents(PP<StoreFileManager> storeFileManager,
- PP<CompactionPolicy> compactionPolicy, PP<Compactor> compactor) {
- storeFileManager.set(new DefaultStoreFileManager(this.comparator));
- compactionPolicy.set(new DefaultCompactionPolicy(this.conf, this.store));
- compactor.set(new DefaultCompactor(this.conf, this.store));
+ protected void createComponents() {
+ storeFileManager = new DefaultStoreFileManager(this.comparator);
+
+ // TODO: compactor and policy may be separately pluggable, but must derive from default ones.
+ compactor = new DefaultCompactor(this.conf, this.store);
+ compactionPolicy = new DefaultCompactionPolicy(this.conf, this.store/*as StoreConfigInfo*/);
+ }
+
+ @Override
+ protected CompactionContext createCompactionContext() {
+ return new DefaultCompactionContext();
+ }
+
+ private class DefaultCompactionContext extends CompactionContext {
+ @Override
+ public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
+ boolean mayUseOffPeak, boolean forceMajor) throws IOException {
+ request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
+ filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);
+ return request != null;
+ }
+
+ @Override
+ public List<Path> compact() throws IOException {
+ return compactor.compact(request);
+ }
+
+ @Override
+ public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
+ return compactionPolicy.preSelectCompactionForCoprocessor(
+ storeFileManager.getStorefiles(), filesCompacting);
+ }
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Feb 26 20:51:59 2013
@@ -119,6 +119,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.CoprocessorServiceCall;
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.SnapshotDescription;
import org.apache.hadoop.hbase.regionserver.MultiVersionConsistencyControl.WriteEntry;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogFactory;
@@ -1291,13 +1292,9 @@ public class HRegion implements HeapSize
*/
public void compactStores() throws IOException {
for (Store s : getStores().values()) {
- CompactionRequest cr = s.requestCompaction();
- if(cr != null) {
- try {
- compact(cr);
- } finally {
- s.finishRequest(cr);
- }
+ CompactionContext compaction = s.requestCompaction();
+ if (compaction != null) {
+ compact(compaction, s);
}
}
}
@@ -1317,45 +1314,46 @@ public class HRegion implements HeapSize
* @return whether the compaction completed
* @throws IOException e
*/
- public boolean compact(CompactionRequest cr)
- throws IOException {
- if (cr == null) {
- return false;
- }
+ public boolean compact(CompactionContext compaction, Store store) throws IOException {
+ assert compaction != null && compaction.hasSelection();
+ assert !compaction.getRequest().getFiles().isEmpty();
if (this.closing.get() || this.closed.get()) {
LOG.debug("Skipping compaction on " + this + " because closing/closed");
+ store.cancelRequestedCompaction(compaction);
return false;
}
- Preconditions.checkArgument(cr.getHRegion().equals(this));
MonitoredTask status = null;
+ boolean didPerformCompaction = false;
// block waiting for the lock for compaction
lock.readLock().lock();
try {
- status = TaskMonitor.get().createStatus(
- "Compacting " + cr.getStore() + " in " + this);
+ status = TaskMonitor.get().createStatus("Compacting " + store + " in " + this);
if (this.closed.get()) {
- LOG.debug("Skipping compaction on " + this + " because closed");
+ String msg = "Skipping compaction on " + this + " because closed";
+ LOG.debug(msg);
+ status.abort(msg);
return false;
}
- boolean decr = true;
+ boolean wasStateSet = false;
try {
synchronized (writestate) {
if (writestate.writesEnabled) {
+ wasStateSet = true;
++writestate.compacting;
} else {
String msg = "NOT compacting region " + this + ". Writes disabled.";
LOG.info(msg);
status.abort(msg);
- decr = false;
return false;
}
}
- LOG.info("Starting compaction on " + cr.getStore() + " in region "
- + this + (cr.getCompactSelection().isOffPeakCompaction()?" as an off-peak compaction":""));
+ LOG.info("Starting compaction on " + store + " in region " + this
+ + (compaction.getRequest().isOffPeak()?" as an off-peak compaction":""));
doRegionCompactionPrep();
try {
- status.setStatus("Compacting store " + cr.getStore());
- cr.getStore().compact(cr);
+ status.setStatus("Compacting store " + store);
+ didPerformCompaction = true;
+ store.compact(compaction);
} catch (InterruptedIOException iioe) {
String msg = "compaction interrupted";
LOG.info(msg, iioe);
@@ -1363,7 +1361,7 @@ public class HRegion implements HeapSize
return false;
}
} finally {
- if (decr) {
+ if (wasStateSet) {
synchronized (writestate) {
--writestate.compacting;
if (writestate.compacting <= 0) {
@@ -1376,6 +1374,7 @@ public class HRegion implements HeapSize
return true;
} finally {
try {
+ if (!didPerformCompaction) store.cancelRequestedCompaction(compaction);
if (status != null) status.cleanup();
} finally {
lock.readLock().unlock();
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Tue Feb 26 20:51:59 2013
@@ -66,7 +66,7 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.exceptions.InvalidHFileException;
import org.apache.hadoop.hbase.io.hfile.NoOpDataBlockEncoder;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -154,8 +154,8 @@ public class HStore implements Store {
// Comparing KeyValues
private final KeyValue.KVComparator comparator;
- final Compactor compactor;
-
+ final StoreEngine<?, ?, ?> storeEngine;
+
private OffPeakCompactions offPeakCompactions;
private static final int DEFAULT_FLUSH_RETRIES_NUMBER = 10;
@@ -223,8 +223,11 @@ public class HStore implements Store {
"hbase.hstore.close.check.interval", 10*1000*1000 /* 10 MB */);
}
- StoreEngine engine = StoreEngine.create(this, this.conf, this.comparator);
- this.storeFileManager = engine.getStoreFileManager();
+ storeEngine = StoreEngine.create(this, this.conf, this.comparator);
+ // Copy some things to local fields for convenience.
+ this.storeFileManager = storeEngine.getStoreFileManager();
+ this.compactionPolicy = storeEngine.getCompactionPolicy();
+
this.storeFileManager.loadFiles(loadStoreFiles());
// Initialize checksum type from name. The names are CRC32, CRC32C, etc.
@@ -243,9 +246,6 @@ public class HStore implements Store {
+ HStore.flush_retries_number);
}
}
- this.compactionPolicy = engine.getCompactionPolicy();
- // Get the compaction tool instance for this policy
- this.compactor = engine.getCompactor();
}
/**
@@ -1067,15 +1067,15 @@ public class HStore implements Store {
* <p>We don't want to hold the structureLock for the whole time, as a compact()
* can be lengthy and we want to allow cache-flushes during this period.
*
- * @param cr
- * compaction details obtained from requestCompaction()
+ * @param compaction compaction details obtained from requestCompaction()
* @throws IOException
* @return Storefile we compacted into or null if we failed or opted out early.
*/
- List<StoreFile> compact(CompactionRequest cr) throws IOException {
- if (cr == null || cr.getFiles().isEmpty()) return null;
- Preconditions.checkArgument(cr.getStore().toString().equals(this.toString()));
- List<StoreFile> filesToCompact = cr.getFiles();
+ public List<StoreFile> compact(CompactionContext compaction) throws IOException {
+ assert compaction != null && compaction.hasSelection();
+ CompactionRequest cr = compaction.getRequest();
+ Collection<StoreFile> filesToCompact = cr.getFiles();
+ assert !filesToCompact.isEmpty();
synchronized (filesCompacting) {
// sanity check: we're compacting files that this store knows about
// TODO: change this to LOG.error() after more debugging
@@ -1091,16 +1091,20 @@ public class HStore implements Store {
List<StoreFile> sfs = new ArrayList<StoreFile>();
long compactionStartTime = EnvironmentEdgeManager.currentTimeMillis();
try {
- List<Path> newFiles = this.compactor.compact(cr);
+ // Commence the compaction.
+ List<Path> newFiles = compaction.compact();
// Move the compaction into place.
if (this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
for (Path newFile: newFiles) {
- StoreFile sf = completeCompaction(filesToCompact, newFile);
+ assert newFile != null;
+ StoreFile sf = moveFileIntoPlace(newFile);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompact(this, sf, cr);
}
+ assert sf != null;
sfs.add(sf);
}
+ completeCompaction(filesToCompact, sfs);
} else {
for (Path newFile: newFiles) {
// Create storefile around what we wrote with a reader on it.
@@ -1111,15 +1115,24 @@ public class HStore implements Store {
}
}
} finally {
- synchronized (filesCompacting) {
- filesCompacting.removeAll(filesToCompact);
- }
+ finishCompactionRequest(cr);
}
+ logCompactionEndMessage(cr, sfs, compactionStartTime);
+ return sfs;
+ }
+ /**
+ * Log a very elaborate compaction completion message.
+ * @param cr Request.
+ * @param sfs Resulting files.
+ * @param compactionStartTime Start time.
+ */
+ private void logCompactionEndMessage(
+ CompactionRequest cr, List<StoreFile> sfs, long compactionStartTime) {
long now = EnvironmentEdgeManager.currentTimeMillis();
StringBuilder message = new StringBuilder(
"Completed" + (cr.isMajor() ? " major " : " ") + "compaction of "
- + filesToCompact.size() + " file(s) in " + this + " of "
+ + cr.getFiles().size() + " file(s) in " + this + " of "
+ this.region.getRegionInfo().getRegionNameAsString()
+ " into ");
if (sfs.isEmpty()) {
@@ -1139,7 +1152,23 @@ public class HStore implements Store {
.append(", and took ").append(StringUtils.formatTimeDiff(now, compactionStartTime))
.append(" to execute.");
LOG.info(message.toString());
- return sfs;
+ }
+
+ // Package-visible for tests
+ StoreFile moveFileIntoPlace(Path newFile) throws IOException {
+ validateStoreFile(newFile);
+ // Move the file into the right spot
+ Path destPath = new Path(homedir, newFile.getName());
+ LOG.info("Renaming compacted file at " + newFile + " to " + destPath);
+ if (!fs.rename(newFile, destPath)) {
+ String err = "Failed move of compacted file " + newFile + " to " + destPath;
+ LOG.error(err);
+ throw new IOException(err);
+ }
+ StoreFile result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
+ this.family.getBloomFilterType(), this.dataBlockEncoder);
+ result.createReader();
+ return result;
}
/**
@@ -1181,13 +1210,17 @@ public class HStore implements Store {
try {
// Ready to go. Have list of files to compact.
- List<Path> newFiles = this.compactor.compactForTesting(filesToCompact, isMajor);
+ List<Path> newFiles =
+ this.storeEngine.getCompactor().compactForTesting(filesToCompact, isMajor);
for (Path newFile: newFiles) {
// Move the compaction into place.
- StoreFile sf = completeCompaction(filesToCompact, newFile);
+ StoreFile sf = moveFileIntoPlace(newFile);
if (region.getCoprocessorHost() != null) {
region.getCoprocessorHost().postCompact(this, sf, null);
}
+ ArrayList<StoreFile> tmp = new ArrayList<StoreFile>();
+ tmp.add(sf);
+ completeCompaction(filesToCompact, tmp);
}
} finally {
synchronized (filesCompacting) {
@@ -1203,7 +1236,7 @@ public class HStore implements Store {
@Override
public CompactionProgress getCompactionProgress() {
- return this.compactor.getProgress();
+ return this.storeEngine.getCompactor().getProgress();
}
@Override
@@ -1219,100 +1252,102 @@ public class HStore implements Store {
}
@Override
- public CompactionRequest requestCompaction() throws IOException {
+ public CompactionContext requestCompaction() throws IOException {
return requestCompaction(Store.NO_PRIORITY, null);
}
@Override
- public CompactionRequest requestCompaction(int priority, CompactionRequest request)
+ public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
throws IOException {
// don't even select for compaction if writes are disabled
if (!this.region.areWritesEnabled()) {
return null;
}
+ CompactionContext compaction = storeEngine.createCompaction();
this.lock.readLock().lock();
try {
- List<StoreFile> candidates = Lists.newArrayList(storeFileManager.getStorefiles());
synchronized (filesCompacting) {
- // First we need to pre-select compaction, and then pre-compact selection!
- candidates = compactionPolicy.preSelectCompaction(candidates, filesCompacting);
- boolean override = false;
+ // First, see if coprocessor would want to override selection.
if (region.getCoprocessorHost() != null) {
- override = region.getCoprocessorHost().preCompactSelection(this, candidates, request);
+ List<StoreFile> candidatesForCoproc = compaction.preSelect(this.filesCompacting);
+ boolean override = region.getCoprocessorHost().preCompactSelection(
+ this, candidatesForCoproc, baseRequest);
+ if (override) {
+ // Coprocessor is overriding normal file selection.
+ compaction.forceSelect(new CompactionRequest(candidatesForCoproc));
+ }
}
- CompactSelection filesToCompact;
- if (override) {
- // coprocessor is overriding normal file selection
- filesToCompact = new CompactSelection(candidates);
- } else {
+
+ // Normal case - coprocessor is not overriding file selection.
+ if (!compaction.hasSelection()) {
boolean isUserCompaction = priority == Store.PRIORITY_USER;
boolean mayUseOffPeak = this.offPeakCompactions.tryStartOffPeakRequest();
- filesToCompact = compactionPolicy.selectCompaction(candidates, isUserCompaction,
+ compaction.select(this.filesCompacting, isUserCompaction,
mayUseOffPeak, forceMajor && filesCompacting.isEmpty());
- if (mayUseOffPeak && !filesToCompact.isOffPeakCompaction()) {
- // Compaction policy doesn't want to do anything with off-peak.
+ assert compaction.hasSelection();
+ if (mayUseOffPeak && !compaction.getRequest().isOffPeak()) {
+ // Compaction policy doesn't want to take advantage of off-peak.
this.offPeakCompactions.endOffPeakRequest();
}
}
-
if (region.getCoprocessorHost() != null) {
- region.getCoprocessorHost().postCompactSelection(this,
- ImmutableList.copyOf(filesToCompact.getFilesToCompact()), request);
+ region.getCoprocessorHost().postCompactSelection(
+ this, ImmutableList.copyOf(compaction.getRequest().getFiles()), baseRequest);
+ }
+
+ // Selected files; see if we have a compaction with some custom base request.
+ if (baseRequest != null) {
+ // Update the request with what the system thinks the request should be;
+ // its up to the request if it wants to listen.
+ compaction.forceSelect(
+ baseRequest.combineWith(compaction.getRequest()));
}
- // no files to compact
- if (filesToCompact.getFilesToCompact().isEmpty()) {
+ // Finally, we have the resulting files list. Check if we have any files at all.
+ final Collection<StoreFile> selectedFiles = compaction.getRequest().getFiles();
+ if (selectedFiles.isEmpty()) {
return null;
}
- // basic sanity check: do not try to compact the same StoreFile twice.
- if (!Collections.disjoint(filesCompacting, filesToCompact.getFilesToCompact())) {
+ // Update filesCompacting (check that we do not try to compact the same StoreFile twice).
+ if (!Collections.disjoint(filesCompacting, selectedFiles)) {
// TODO: change this from an IAE to LOG.error after sufficient testing
Preconditions.checkArgument(false, "%s overlaps with %s",
- filesToCompact, filesCompacting);
+ selectedFiles, filesCompacting);
}
- filesCompacting.addAll(filesToCompact.getFilesToCompact());
+ filesCompacting.addAll(selectedFiles);
Collections.sort(filesCompacting, StoreFile.Comparators.SEQ_ID);
- boolean isMajor =
- (filesToCompact.getFilesToCompact().size() == this.getStorefilesCount());
- if (isMajor) {
- // since we're enqueuing a major, update the compaction wait interval
- this.forceMajor = false;
- }
-
- LOG.debug(getRegionInfo().getEncodedName() + " - " +
- getColumnFamilyName() + ": Initiating " +
- (isMajor ? "major" : "minor") + " compaction");
-
- // everything went better than expected. create a compaction request
- int pri = getCompactPriority(priority);
- //not a special compaction request, so we need to make one
- if(request == null){
- request = new CompactionRequest(region, this, filesToCompact, isMajor, pri);
- }else{
- //update the request with what the system thinks the request should be
- //its up to the request if it wants to listen
- request.setSelection(filesToCompact);
- request.setIsMajor(isMajor);
- request.setPriority(pri);
- }
+ // If we're enqueuing a major, clear the force flag.
+ boolean isMajor = selectedFiles.size() == this.getStorefilesCount();
+ this.forceMajor = this.forceMajor && !isMajor;
+
+ // Set common request properties.
+ compaction.getRequest().setPriority(getCompactPriority(priority));
+ compaction.getRequest().setIsMajor(isMajor);
+ compaction.getRequest().setDescription(
+ region.getRegionNameAsString(), getColumnFamilyName());
}
} finally {
this.lock.readLock().unlock();
}
- if (request != null) {
- this.region.reportCompactionRequestStart(request.isMajor());
- }
- return request;
+
+ LOG.debug(getRegionInfo().getEncodedName() + " - " + getColumnFamilyName() + ": Initiating "
+ + (compaction.getRequest().isMajor() ? "major" : "minor") + " compaction");
+ this.region.reportCompactionRequestStart(compaction.getRequest().isMajor());
+ return compaction;
+ }
+
+ public void cancelRequestedCompaction(CompactionContext compaction) {
+ finishCompactionRequest(compaction.getRequest());
}
- public void finishRequest(CompactionRequest cr) {
+ private void finishCompactionRequest(CompactionRequest cr) {
this.region.reportCompactionRequestEnd(cr.isMajor());
- if (cr.getCompactSelection().isOffPeakCompaction()) {
+ if (cr.isOffPeak()) {
this.offPeakCompactions.endOffPeakRequest();
- cr.getCompactSelection().setOffPeak(false);
+ cr.setOffPeak(false);
}
synchronized (filesCompacting) {
filesCompacting.removeAll(cr.getFiles());
@@ -1363,28 +1398,8 @@ public class HStore implements Store {
* @return StoreFile created. May be null.
* @throws IOException
*/
- StoreFile completeCompaction(final Collection<StoreFile> compactedFiles,
- final Path newFile)
- throws IOException {
- // 1. Moving the new files into place -- if there is a new file (may not
- // be if all cells were expired or deleted).
- StoreFile result = null;
- if (newFile != null) {
- validateStoreFile(newFile);
- // Move the file into the right spot
- Path destPath = new Path(homedir, newFile.getName());
- LOG.info("Renaming compacted file at " + newFile + " to " + destPath);
- if (!fs.rename(newFile, destPath)) {
- LOG.error("Failed move of compacted file " + newFile + " to " +
- destPath);
- throw new IOException("Failed move of compacted file " + newFile +
- " to " + destPath);
- }
- result = new StoreFile(this.fs, destPath, this.conf, this.cacheConf,
- this.family.getBloomFilterType(), this.dataBlockEncoder);
- result.createReader();
- }
-
+ private void completeCompaction(final Collection<StoreFile> compactedFiles,
+ final Collection<StoreFile> result) throws IOException {
try {
this.lock.writeLock().lock();
try {
@@ -1392,11 +1407,7 @@ public class HStore implements Store {
// delete old store files until we have sent out notification of
// change in case old files are still being accessed by outstanding
// scanners.
- List<StoreFile> results = new ArrayList<StoreFile>(1);
- if (result != null) {
- results.add(result);
- }
- this.storeFileManager.addCompactionResults(compactedFiles, results);
+ this.storeFileManager.addCompactionResults(compactedFiles, result);
filesCompacting.removeAll(compactedFiles); // safe bc: lock.writeLock()
} finally {
// We need the lock, as long as we are updating the storeFiles
@@ -1418,8 +1429,8 @@ public class HStore implements Store {
} catch (IOException e) {
e = RemoteExceptionHandler.checkIOException(e);
LOG.error("Failed replacing compacted files in " + this +
- ". Compacted file is " + (result == null? "none": result.toString()) +
- ". Files replaced " + compactedFiles.toString() +
+ ". Compacted files are " + (result == null? "none": result.toString()) +
+ ". Files replaced " + compactedFiles.toString() +
" some of which may have been already removed", e);
}
@@ -1435,7 +1446,6 @@ public class HStore implements Store {
this.storeSize += r.length();
this.totalUncompressedBytes += r.getTotalUncompressedBytes();
}
- return result;
}
/*
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionCoprocessorHost.java Tue Feb 26 20:51:59 2013
@@ -55,7 +55,6 @@ import org.apache.hadoop.hbase.coprocess
import org.apache.hadoop.hbase.filter.ByteArrayComparable;
import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactSelection;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Feb 26 20:51:59 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.io.HeapSi
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFileDataBlockEncoder;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
@@ -157,12 +158,14 @@ public interface Store extends HeapSize,
*/
public CompactionProgress getCompactionProgress();
- public CompactionRequest requestCompaction() throws IOException;
+ public CompactionContext requestCompaction() throws IOException;
- public CompactionRequest requestCompaction(int priority, CompactionRequest request)
+ public CompactionContext requestCompaction(int priority, CompactionRequest baseRequest)
throws IOException;
- public void finishRequest(CompactionRequest cr);
+ public void cancelRequestedCompaction(CompactionContext compaction);
+
+ public List<StoreFile> compact(CompactionContext compaction) throws IOException;
/**
* @return true if we should run a major compaction.
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java Tue Feb 26 20:51:59 2013
@@ -24,8 +24,11 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionPolicy;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactionPolicy;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.util.ReflectionUtils;
/**
@@ -34,14 +37,15 @@ import org.apache.hadoop.hbase.util.Refl
* they are tied together and replaced together via StoreEngine-s.
*/
@InterfaceAudience.Private
-public abstract class StoreEngine {
+public abstract class StoreEngine<
+ CP extends CompactionPolicy, C extends Compactor, SFM extends StoreFileManager> {
protected final Store store;
protected final Configuration conf;
protected final KVComparator comparator;
- private final PP<CompactionPolicy> compactionPolicy = new PP<CompactionPolicy>();
- private final PP<Compactor> compactor = new PP<Compactor>();
- private final PP<StoreFileManager> storeFileManager = new PP<StoreFileManager>();
+ protected CP compactionPolicy;
+ protected C compactor;
+ protected SFM storeFileManager;
private boolean isInitialized = false;
/**
@@ -50,7 +54,7 @@ public abstract class StoreEngine {
*/
public static final String STORE_ENGINE_CLASS_KEY = "hbase.hstore.engine.class";
- private static final Class<? extends StoreEngine>
+ private static final Class<? extends StoreEngine<?, ?, ?>>
DEFAULT_STORE_ENGINE_CLASS = DefaultStoreEngine.class;
/**
@@ -58,7 +62,7 @@ public abstract class StoreEngine {
*/
public CompactionPolicy getCompactionPolicy() {
createComponentsOnce();
- return this.compactionPolicy.get();
+ return this.compactionPolicy;
}
/**
@@ -66,7 +70,7 @@ public abstract class StoreEngine {
*/
public Compactor getCompactor() {
createComponentsOnce();
- return this.compactor.get();
+ return this.compactor;
}
/**
@@ -74,7 +78,7 @@ public abstract class StoreEngine {
*/
public StoreFileManager getStoreFileManager() {
createComponentsOnce();
- return this.storeFileManager.get();
+ return this.storeFileManager;
}
protected StoreEngine(Configuration conf, Store store, KVComparator comparator) {
@@ -83,18 +87,22 @@ public abstract class StoreEngine {
this.comparator = comparator;
}
+ public CompactionContext createCompaction() {
+ createComponentsOnce();
+ return this.createCompactionContext();
+ }
+
+ protected abstract CompactionContext createCompactionContext();
+
/**
* Create the StoreEngine's components.
- * @param storeFileManager out parameter for StoreFileManager.
- * @param compactionPolicy out parameter for CompactionPolicy.
- * @param compactor out parameter for Compactor.
*/
- protected abstract void createComponents(PP<StoreFileManager> storeFileManager,
- PP<CompactionPolicy> compactionPolicy, PP<Compactor> compactor);
+ protected abstract void createComponents();
private void createComponentsOnce() {
if (isInitialized) return;
- createComponents(storeFileManager, compactionPolicy, compactor);
+ createComponents();
+ assert compactor != null && compactionPolicy != null && storeFileManager != null;
isInitialized = true;
}
@@ -117,18 +125,4 @@ public abstract class StoreEngine {
throw new IOException("Unable to load configured store engine '" + className + "'", e);
}
}
-
- /**
- * To allow StoreEngine-s to have custom dependencies between 3 components, we want to create
- * them in one place. To return multiple, simulate C++ pointer to pointers/C# out params.
- */
- protected static class PP<T> {
- private T t = null;
- public void set(T t) {
- this.t = t;
- }
- public T get() {
- return this.t;
- }
- }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java Tue Feb 26 20:51:59 2013
@@ -39,7 +39,7 @@ import com.google.common.collect.Immutab
* Implementations are assumed to be not thread safe.
*/
@InterfaceAudience.Private
-interface StoreFileManager {
+public interface StoreFileManager {
/**
* Loads the initial store files into empty StoreFileManager.
* @param storeFiles The files to load.
@@ -56,7 +56,6 @@ interface StoreFileManager {
* Adds compaction results into the structure.
* @param compactedFiles The input files for the compaction.
* @param results The resulting files for the compaction.
- * @return The files that can be removed from storage. Usually,
*/
public abstract void addCompactionResults(
Collection<StoreFile> compactedFiles, Collection<StoreFile> results);
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java?rev=1450407&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionContext.java Tue Feb 26 20:51:59 2013
@@ -0,0 +1,80 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+
+
+/**
+ * This class holds all "physical" details necessary to run a compaction.
+ * It also has compaction request with all the logical details.
+ * Hence, this class is basically the compaction.
+ */
+@InterfaceAudience.Private
+public abstract class CompactionContext {
+ protected CompactionRequest request = null;
+
+ /**
+ * Called before coprocessor preCompactSelection and should filter the candidates
+ * for coprocessor; i.e. exclude the files that definitely cannot be compacted at this time.
+ * @param filesCompacting files currently compacting
+ * @return the list of files that can theoretically be compacted.
+ */
+ public abstract List<StoreFile> preSelect(final List<StoreFile> filesCompacting);
+
+ /**
+ * Called to select files for compaction. Must fill in the request field if successful.
+ * @param filesCompacting Files currently being compacted by other compactions.
+ * @param isUserCompaction Whether this is a user compaction.
+ * @param mayUseOffPeak Whether the underlying policy may assume it's off-peak hours.
+ * @param forceMajor Whether to force major compaction.
+ * @return Whether the selection succeeded. Selection may be empty and lead to no compaction.
+ */
+ public abstract boolean select(
+ final List<StoreFile> filesCompacting, final boolean isUserCompaction,
+ final boolean mayUseOffPeak, final boolean forceMajor) throws IOException;
+
+ /**
+ * Forces external selection to be applied for this compaction.
+ * @param request The pre-cooked request with selection and other settings.
+ */
+ public void forceSelect(CompactionRequest request) {
+ this.request = request;
+ }
+
+ /**
+ * Runs the compaction based on current selection. select/forceSelect must have been called.
+ * @return The new file paths resulting from compaction.
+ */
+ public abstract List<Path> compact() throws IOException;
+
+ public CompactionRequest getRequest() {
+ assert hasSelection();
+ return this.request;
+ }
+
+ public boolean hasSelection() {
+ return this.request != null;
+ }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java Tue Feb 26 20:51:59 2013
@@ -43,25 +43,6 @@ public abstract class CompactionPolicy {
}
/**
- * This is called before coprocessor preCompactSelection and should filter the candidates
- * for coprocessor; i.e. exclude the files that definitely cannot be compacted at this time.
- * @param candidateFiles candidate files, ordered from oldest to newest
- * @param filesCompacting files currently compacting
- * @return the list of files that can theoretically be compacted.
- */
- public abstract List<StoreFile> preSelectCompaction(
- List<StoreFile> candidateFiles, final List<StoreFile> filesCompacting);
-
- /**
- * @param candidateFiles candidate files, ordered from oldest to newest
- * @return subset copy of candidate list that meets compaction criteria
- * @throws java.io.IOException
- */
- public abstract CompactSelection selectCompaction(
- final List<StoreFile> candidateFiles, final boolean isUserCompaction,
- final boolean mayUseOffPeak, final boolean forceMajor) throws IOException;
-
- /**
* @param storeFiles Store files in the store.
* @return The system compaction priority of the store, based on storeFiles.
* The priority range is as such - the smaller values are higher priority;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionRequest.java Tue Feb 26 20:51:59 2013
@@ -18,22 +18,13 @@
*/
package org.apache.hadoop.hbase.regionserver.compactions;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadPoolExecutor;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hbase.RemoteExceptionHandler;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -46,231 +37,193 @@ import com.google.common.base.Predicate;
import com.google.common.collect.Collections2;
/**
- * This class holds all details necessary to run a compaction.
+ * This class holds all logical details necessary to run a compaction.
*/
@InterfaceAudience.LimitedPrivate({ "coprocessor" })
@InterfaceStability.Evolving
-public class CompactionRequest implements Comparable<CompactionRequest>,
- Runnable {
- static final Log LOG = LogFactory.getLog(CompactionRequest.class);
- private final HRegion region;
- private final HStore store;
- private CompactSelection compactSelection;
- private long totalSize;
- private boolean isMajor;
- private int priority;
- private final Long timeInNanos;
- private HRegionServer server = null;
-
- public static CompactionRequest getRequestForTesting(Collection<StoreFile> selection,
- boolean isMajor) {
- return new CompactionRequest(null, null, new CompactSelection(new ArrayList<StoreFile>(
- selection)), isMajor, 0, System.nanoTime());
- }
-
- /**
- * Constructor for a custom compaction. Uses the setXXX methods to update the state of the
- * compaction before being used.
- */
- public CompactionRequest(HRegion region, HStore store, int priority) {
- this(region, store, null, false, priority, System
- .nanoTime());
- }
-
- public CompactionRequest(HRegion r, HStore s, CompactSelection files, boolean isMajor, int p) {
- // delegate to the internal constructor after checking basic preconditions
- this(Preconditions.checkNotNull(r), s, Preconditions.checkNotNull(files), isMajor, p, System
- .nanoTime());
- }
-
- private CompactionRequest(HRegion region, HStore store, CompactSelection files, boolean isMajor,
- int priority, long startTime) {
- this.region = region;
- this.store = store;
- this.isMajor = isMajor;
- this.priority = priority;
- this.timeInNanos = startTime;
- if (files != null) {
- this.setSelection(files);
- }
- }
-
- /**
- * This function will define where in the priority queue the request will
- * end up. Those with the highest priorities will be first. When the
- * priorities are the same it will first compare priority then date
- * to maintain a FIFO functionality.
- *
- * <p>Note: The date is only accurate to the millisecond which means it is
- * possible that two requests were inserted into the queue within a
- * millisecond. When that is the case this function will break the tie
- * arbitrarily.
- */
- @Override
- public int compareTo(CompactionRequest request) {
- //NOTE: The head of the priority queue is the least element
- if (this.equals(request)) {
- return 0; //they are the same request
- }
- int compareVal;
-
- compareVal = priority - request.priority; //compare priority
- if (compareVal != 0) {
- return compareVal;
- }
-
- compareVal = timeInNanos.compareTo(request.timeInNanos);
- if (compareVal != 0) {
- return compareVal;
- }
-
- // break the tie based on hash code
- return this.hashCode() - request.hashCode();
- }
-
- @Override
- public boolean equals(Object obj) {
- return (this == obj);
- }
-
- /** Gets the HRegion for the request */
- public HRegion getHRegion() {
- return region;
- }
-
- /** Gets the Store for the request */
- public HStore getStore() {
- return store;
- }
-
- /** Gets the compact selection object for the request */
- public CompactSelection getCompactSelection() {
- return compactSelection;
- }
-
- /** Gets the StoreFiles for the request */
- public List<StoreFile> getFiles() {
- return compactSelection.getFilesToCompact();
- }
-
- /** Gets the total size of all StoreFiles in compaction */
- public long getSize() {
- return totalSize;
- }
-
- public boolean isMajor() {
- return this.isMajor;
- }
-
- /** Gets the priority for the request */
- public int getPriority() {
- return priority;
- }
-
- public long getSelectionTime() {
- return compactSelection.getSelectionTime();
- }
-
- /** Gets the priority for the request */
- public void setPriority(int p) {
- this.priority = p;
- }
-
- public void setServer(HRegionServer hrs) {
- this.server = hrs;
- }
-
- /**
- * Set the files (and, implicitly, the size of the compaction based on those files)
- * @param files files that should be included in the compaction
- */
- public void setSelection(CompactSelection files) {
- long sz = 0;
- for (StoreFile sf : files.getFilesToCompact()) {
- sz += sf.getReader().length();
- }
- this.totalSize = sz;
- this.compactSelection = files;
- }
-
- /**
- * Specify if this compaction should be a major compaction based on the state of the store
- * @param isMajor <tt>true</tt> if the system determines that this compaction should be a major
- * compaction
- */
- public void setIsMajor(boolean isMajor) {
- this.isMajor = isMajor;
- }
-
- @Override
- public String toString() {
- String fsList = Joiner.on(", ").join(
- Collections2.transform(Collections2.filter(
- compactSelection.getFilesToCompact(),
- new Predicate<StoreFile>() {
- public boolean apply(StoreFile sf) {
- return sf.getReader() != null;
- }
- }), new Function<StoreFile, String>() {
- public String apply(StoreFile sf) {
- return StringUtils.humanReadableInt(sf.getReader().length());
+public class CompactionRequest implements Comparable<CompactionRequest> {
+ static final Log LOG = LogFactory.getLog(CompactionRequest.class);
+ // was this compaction promoted to an off-peak
+ private boolean isOffPeak = false;
+ private boolean isMajor = false;
+ private int priority = Store.NO_PRIORITY;
+ private Collection<StoreFile> filesToCompact;
+
+ // CompactRequest object creation time.
+ private long selectionTime;
+ // System time used to compare objects in FIFO order. TODO: maybe use selectionTime?
+ private Long timeInNanos;
+ private String regionName = "";
+ private String storeName = "";
+ private long totalSize = -1L;
+
+ /**
+ * This ctor should be used by coprocessors that want to subclass CompactionRequest.
+ */
+ public CompactionRequest() {
+ this.selectionTime = EnvironmentEdgeManager.currentTimeMillis();
+ this.timeInNanos = System.nanoTime();
+ }
+
+ public CompactionRequest(Collection<StoreFile> files) {
+ this();
+ Preconditions.checkNotNull(files);
+ this.filesToCompact = files;
+ recalculateSize();
+ }
+
+ /**
+ * Called before compaction is executed by CompactSplitThread; for use by coproc subclasses.
+ */
+ public void beforeExecute() {}
+
+ /**
+ * Called after compaction is executed by CompactSplitThread; for use by coproc subclasses.
+ */
+ public void afterExecute() {}
+
+ /**
+ * Combines the request with other request. Coprocessors subclassing CR may override
+ * this if they want to do clever things based on CompactionPolicy selection that
+ * is passed to this method via "other". The default implementation just does a copy.
+ * @param other Request to combine with.
+ * @return The result (may be "this" or "other").
+ */
+ public CompactionRequest combineWith(CompactionRequest other) {
+ this.filesToCompact = new ArrayList<StoreFile>(other.getFiles());
+ this.isOffPeak = other.isOffPeak;
+ this.isMajor = other.isMajor;
+ this.priority = other.priority;
+ this.selectionTime = other.selectionTime;
+ this.timeInNanos = other.timeInNanos;
+ this.regionName = other.regionName;
+ this.storeName = other.storeName;
+ this.totalSize = other.totalSize;
+ return this;
+ }
+
+ /**
+ * This function will define where in the priority queue the request will
+ * end up. Those with the highest priorities will be first. When the
+ * priorities are the same it will first compare priority then date
+ * to maintain a FIFO functionality.
+ *
+ * <p>Note: The date is only accurate to the millisecond which means it is
+ * possible that two requests were inserted into the queue within a
+ * millisecond. When that is the case this function will break the tie
+ * arbitrarily.
+ */
+ @Override
+ public int compareTo(CompactionRequest request) {
+ //NOTE: The head of the priority queue is the least element
+ if (this.equals(request)) {
+ return 0; //they are the same request
+ }
+ int compareVal;
+
+ compareVal = priority - request.priority; //compare priority
+ if (compareVal != 0) {
+ return compareVal;
+ }
+
+ compareVal = timeInNanos.compareTo(request.timeInNanos);
+ if (compareVal != 0) {
+ return compareVal;
+ }
+
+ // break the tie based on hash code
+ return this.hashCode() - request.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return (this == obj);
+ }
+
+ public Collection<StoreFile> getFiles() {
+ return this.filesToCompact;
+ }
+
+ /**
+ * Sets the region/store name, for logging.
+ */
+ public void setDescription(String regionName, String storeName) {
+ this.regionName = regionName;
+ this.storeName = storeName;
+ }
+
+ /** Gets the total size of all StoreFiles in compaction */
+ public long getSize() {
+ return totalSize;
+ }
+
+ public boolean isMajor() {
+ return this.isMajor;
+ }
+
+ /** Gets the priority for the request */
+ public int getPriority() {
+ return priority;
+ }
+
+ /** Sets the priority for the request */
+ public void setPriority(int p) {
+ this.priority = p;
+ }
+
+ public boolean isOffPeak() {
+ return this.isOffPeak;
+ }
+
+ public void setOffPeak(boolean value) {
+ this.isOffPeak = value;
+ }
+
+ public long getSelectionTime() {
+ return this.selectionTime;
+ }
+
+ /**
+ * Specify if this compaction should be a major compaction based on the state of the store
+ * @param isMajor <tt>true</tt> if the system determines that this compaction should be a major
+ * compaction
+ */
+ public void setIsMajor(boolean isMajor) {
+ this.isMajor = isMajor;
+ }
+
+ @Override
+ public String toString() {
+ String fsList = Joiner.on(", ").join(
+ Collections2.transform(Collections2.filter(
+ this.getFiles(),
+ new Predicate<StoreFile>() {
+ public boolean apply(StoreFile sf) {
+ return sf.getReader() != null;
}
- }));
-
- return "regionName=" + region.getRegionNameAsString() +
- ", storeName=" + new String(store.getFamily().getName()) +
- ", fileCount=" + compactSelection.getFilesToCompact().size() +
- ", fileSize=" + StringUtils.humanReadableInt(totalSize) +
- ((fsList.isEmpty()) ? "" : " (" + fsList + ")") +
- ", priority=" + priority + ", time=" + timeInNanos;
- }
-
- @Override
- public void run() {
- Preconditions.checkNotNull(server);
- if (server.isStopped()) {
- return;
- }
- try {
- long start = EnvironmentEdgeManager.currentTimeMillis();
- boolean completed = region.compact(this);
- long now = EnvironmentEdgeManager.currentTimeMillis();
- LOG.info(((completed) ? "completed" : "aborted") + " compaction: " +
- this + "; duration=" + StringUtils.formatTimeDiff(now, start));
- if (completed) {
- // degenerate case: blocked regions require recursive enqueues
- if (store.getCompactPriority() <= 0) {
- server.compactSplitThread.requestCompaction(region, store, "Recursive enqueue", null);
- } else {
- // see if the compaction has caused us to exceed max region size
- server.getCompactSplitThread().requestSplit(region);
+ }), new Function<StoreFile, String>() {
+ public String apply(StoreFile sf) {
+ return StringUtils.humanReadableInt(sf.getReader().length());
}
- }
- } catch (IOException ex) {
- LOG.error("Compaction failed " + this, RemoteExceptionHandler
- .checkIOException(ex));
- server.checkFileSystem();
- } catch (Exception ex) {
- LOG.error("Compaction failed " + this, ex);
- server.checkFileSystem();
- } finally {
- store.finishRequest(this);
- LOG.debug("CompactSplitThread Status: " + server.compactSplitThread);
- }
- }
+ }));
- /**
- * Cleanup class to use when rejecting a compaction request from the queue.
- */
- public static class Rejection implements RejectedExecutionHandler {
-
- @Override
- public void rejectedExecution(Runnable request, ThreadPoolExecutor pool) {
- if (request instanceof CompactionRequest) {
- CompactionRequest cr = (CompactionRequest) request;
- LOG.debug("Compaction Rejected: " + cr);
- cr.getStore().finishRequest(cr);
- }
- }
+ return "regionName=" + regionName + ", storeName=" + storeName +
+ ", fileCount=" + this.getFiles().size() +
+ ", fileSize=" + StringUtils.humanReadableInt(totalSize) +
+ ((fsList.isEmpty()) ? "" : " (" + fsList + ")") +
+ ", priority=" + priority + ", time=" + timeInNanos;
+ }
+
+ /**
+ * Recalculate the size of the compaction based on current files.
+ * @param files files that should be included in the compaction
+ */
+ private void recalculateSize() {
+ long sz = 0;
+ for (StoreFile sf : this.filesToCompact) {
+ sz += sf.getReader().length();
}
+ this.totalSize = sz;
+ }
}
+
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java Tue Feb 26 20:51:59 2013
@@ -60,7 +60,9 @@ public abstract class Compactor {
*/
public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
throws IOException {
- return compact(CompactionRequest.getRequestForTesting(filesToCompact, isMajor));
+ CompactionRequest cr = new CompactionRequest(filesToCompact);
+ cr.setIsMajor(isMajor);
+ return this.compact(cr);
}
public CompactionProgress getProgress() {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactionPolicy.java Tue Feb 26 20:51:59 2013
@@ -32,9 +32,11 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreFileManager;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -49,16 +51,15 @@ import com.google.common.collect.Collect
*/
@InterfaceAudience.Private
public class DefaultCompactionPolicy extends CompactionPolicy {
-
private static final Log LOG = LogFactory.getLog(DefaultCompactionPolicy.class);
public DefaultCompactionPolicy(Configuration conf, StoreConfigInformation storeConfigInfo) {
super(conf, storeConfigInfo);
}
- @Override
- public List<StoreFile> preSelectCompaction(
- List<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
+
+ private ArrayList<StoreFile> getCurrentEligibleFiles(
+ ArrayList<StoreFile> candidateFiles, final List<StoreFile> filesCompacting) {
// candidates = all storefiles not already in compaction queue
if (!filesCompacting.isEmpty()) {
// exclude all files older than the newest file we're currently
@@ -71,6 +72,11 @@ public class DefaultCompactionPolicy ext
return candidateFiles;
}
+ public List<StoreFile> preSelectCompactionForCoprocessor(
+ final Collection<StoreFile> candidates, final List<StoreFile> filesCompacting) {
+ return getCurrentEligibleFiles(new ArrayList<StoreFile>(candidates), filesCompacting);
+ }
+
@Override
public int getSystemCompactionPriority(final Collection<StoreFile> storeFiles) {
return this.comConf.getBlockingStorefileCount() - storeFiles.size();
@@ -81,20 +87,20 @@ public class DefaultCompactionPolicy ext
* @return subset copy of candidate list that meets compaction criteria
* @throws java.io.IOException
*/
- @Override
- public CompactSelection selectCompaction(List<StoreFile> candidateFiles,
- final boolean isUserCompaction, final boolean mayUseOffPeak, final boolean forceMajor)
- throws IOException {
+ public CompactionRequest selectCompaction(Collection<StoreFile> candidateFiles,
+ final List<StoreFile> filesCompacting, final boolean isUserCompaction,
+ final boolean mayUseOffPeak, final boolean forceMajor) throws IOException {
// Preliminary compaction subject to filters
- CompactSelection candidateSelection = new CompactSelection(candidateFiles);
+ ArrayList<StoreFile> candidateSelection = new ArrayList<StoreFile>(candidateFiles);
+ candidateSelection = getCurrentEligibleFiles(candidateSelection, filesCompacting);
long cfTtl = this.storeConfigInfo.getStoreFileTtl();
if (!forceMajor) {
// If there are expired files, only select them so that compaction deletes them
if (comConf.shouldDeleteExpired() && (cfTtl != Long.MAX_VALUE)) {
- CompactSelection expiredSelection = selectExpiredStoreFiles(
- candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
+ ArrayList<StoreFile> expiredSelection = selectExpiredStoreFiles(
+ candidateSelection, EnvironmentEdgeManager.currentTimeMillis() - cfTtl);
if (expiredSelection != null) {
- return expiredSelection;
+ return new CompactionRequest(expiredSelection);
}
}
candidateSelection = skipLargeFiles(candidateSelection);
@@ -106,21 +112,23 @@ public class DefaultCompactionPolicy ext
// Or, if there are any references among the candidates.
boolean majorCompaction = (
(forceMajor && isUserCompaction)
- || ((forceMajor || isMajorCompaction(candidateSelection.getFilesToCompact()))
- && (candidateSelection.getFilesToCompact().size() < comConf.getMaxFilesToCompact()))
- || StoreUtils.hasReferences(candidateSelection.getFilesToCompact())
+ || ((forceMajor || isMajorCompaction(candidateSelection))
+ && (candidateSelection.size() < comConf.getMaxFilesToCompact()))
+ || StoreUtils.hasReferences(candidateSelection)
);
if (!majorCompaction) {
// we're doing a minor compaction, let's see what files are applicable
- candidateSelection.setOffPeak(mayUseOffPeak);
candidateSelection = filterBulk(candidateSelection);
- candidateSelection = applyCompactionPolicy(candidateSelection);
+ candidateSelection = applyCompactionPolicy(candidateSelection, mayUseOffPeak);
candidateSelection = checkMinFilesCriteria(candidateSelection);
}
- candidateSelection =
- removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
- return candidateSelection;
+ candidateSelection = removeExcessFiles(candidateSelection, isUserCompaction, majorCompaction);
+ CompactionRequest result = new CompactionRequest(candidateSelection);
+ if (!majorCompaction && !candidateSelection.isEmpty()) {
+ result.setOffPeak(mayUseOffPeak);
+ }
+ return result;
}
/**
@@ -133,33 +141,25 @@ public class DefaultCompactionPolicy ext
* @return A CompactSelection contains the expired store files as
* filesToCompact
*/
- private CompactSelection selectExpiredStoreFiles(
- CompactSelection candidates, long maxExpiredTimeStamp) {
- List<StoreFile> filesToCompact = candidates.getFilesToCompact();
- if (filesToCompact == null || filesToCompact.size() == 0)
- return null;
+ private ArrayList<StoreFile> selectExpiredStoreFiles(
+ ArrayList<StoreFile> candidates, long maxExpiredTimeStamp) {
+ if (candidates == null || candidates.size() == 0) return null;
ArrayList<StoreFile> expiredStoreFiles = null;
- boolean hasExpiredStoreFiles = false;
- CompactSelection expiredSFSelection = null;
- for (StoreFile storeFile : filesToCompact) {
+ for (StoreFile storeFile : candidates) {
if (storeFile.getReader().getMaxTimestamp() < maxExpiredTimeStamp) {
LOG.info("Deleting the expired store file by compaction: "
+ storeFile.getPath() + " whose maxTimeStamp is "
+ storeFile.getReader().getMaxTimestamp()
+ " while the max expired timestamp is " + maxExpiredTimeStamp);
- if (!hasExpiredStoreFiles) {
+ if (expiredStoreFiles == null) {
expiredStoreFiles = new ArrayList<StoreFile>();
- hasExpiredStoreFiles = true;
}
expiredStoreFiles.add(storeFile);
}
}
- if (hasExpiredStoreFiles) {
- expiredSFSelection = new CompactSelection(expiredStoreFiles);
- }
- return expiredSFSelection;
+ return expiredStoreFiles;
}
/**
@@ -168,18 +168,16 @@ public class DefaultCompactionPolicy ext
* exclude all files above maxCompactSize
* Also save all references. We MUST compact them
*/
- private CompactSelection skipLargeFiles(CompactSelection candidates) {
+ private ArrayList<StoreFile> skipLargeFiles(ArrayList<StoreFile> candidates) {
int pos = 0;
- while (pos < candidates.getFilesToCompact().size() &&
- candidates.getFilesToCompact().get(pos).getReader().length() >
- comConf.getMaxCompactSize() &&
- !candidates.getFilesToCompact().get(pos).isReference()) {
+ while (pos < candidates.size() && !candidates.get(pos).isReference()
+ && (candidates.get(pos).getReader().length() > comConf.getMaxCompactSize())) {
++pos;
}
if (pos > 0) {
LOG.debug("Some files are too large. Excluding " + pos
+ " files from compaction candidates");
- candidates.clearSubList(0, pos);
+ candidates.subList(0, pos).clear();
}
return candidates;
}
@@ -189,9 +187,8 @@ public class DefaultCompactionPolicy ext
* @return filtered subset
* exclude all bulk load files if configured
*/
- private CompactSelection filterBulk(CompactSelection candidates) {
- candidates.getFilesToCompact().removeAll(Collections2.filter(
- candidates.getFilesToCompact(),
+ private ArrayList<StoreFile> filterBulk(ArrayList<StoreFile> candidates) {
+ candidates.removeAll(Collections2.filter(candidates,
new Predicate<StoreFile>() {
@Override
public boolean apply(StoreFile input) {
@@ -206,9 +203,9 @@ public class DefaultCompactionPolicy ext
* @return filtered subset
* take upto maxFilesToCompact from the start
*/
- private CompactSelection removeExcessFiles(CompactSelection candidates,
+ private ArrayList<StoreFile> removeExcessFiles(ArrayList<StoreFile> candidates,
boolean isUserCompaction, boolean isMajorCompaction) {
- int excess = candidates.getFilesToCompact().size() - comConf.getMaxFilesToCompact();
+ int excess = candidates.size() - comConf.getMaxFilesToCompact();
if (excess > 0) {
if (isMajorCompaction && isUserCompaction) {
LOG.debug("Warning, compacting more than " + comConf.getMaxFilesToCompact() +
@@ -216,8 +213,7 @@ public class DefaultCompactionPolicy ext
} else {
LOG.debug("Too many admissible files. Excluding " + excess
+ " files from compaction candidates");
- candidates.clearSubList(comConf.getMaxFilesToCompact(),
- candidates.getFilesToCompact().size());
+ candidates.subList(comConf.getMaxFilesToCompact(), candidates.size()).clear();
}
}
return candidates;
@@ -227,16 +223,14 @@ public class DefaultCompactionPolicy ext
* @return filtered subset
* forget the compactionSelection if we don't have enough files
*/
- private CompactSelection checkMinFilesCriteria(CompactSelection candidates) {
+ private ArrayList<StoreFile> checkMinFilesCriteria(ArrayList<StoreFile> candidates) {
int minFiles = comConf.getMinFilesToCompact();
- if (candidates.getFilesToCompact().size() < minFiles) {
+ if (candidates.size() < minFiles) {
if(LOG.isDebugEnabled()) {
- LOG.debug("Not compacting files because we only have " +
- candidates.getFilesToCompact().size() +
- " files ready for compaction. Need " + minFiles + " to initiate.");
+ LOG.debug("Not compacting files because we only have " + candidates.size() +
+ " files ready for compaction. Need " + minFiles + " to initiate.");
}
- candidates.emptyFileList();
- candidates.setOffPeak(false);
+ candidates.clear();
}
return candidates;
}
@@ -271,25 +265,26 @@ public class DefaultCompactionPolicy ext
* | | | | | | | | | | | |
* | | | | | | | | | | | |
*/
- CompactSelection applyCompactionPolicy(CompactSelection candidates) throws IOException {
- if (candidates.getFilesToCompact().isEmpty()) {
+ ArrayList<StoreFile> applyCompactionPolicy(
+ ArrayList<StoreFile> candidates, boolean mayUseOffPeak) throws IOException {
+ if (candidates.isEmpty()) {
return candidates;
}
// we're doing a minor compaction, let's see what files are applicable
int start = 0;
double ratio = comConf.getCompactionRatio();
- if (candidates.isOffPeakCompaction()) {
+ if (mayUseOffPeak) {
ratio = comConf.getCompactionRatioOffPeak();
LOG.info("Running an off-peak compaction, selection ratio = " + ratio);
}
// get store file sizes for incremental compacting selection.
- int countOfFiles = candidates.getFilesToCompact().size();
+ final int countOfFiles = candidates.size();
long[] fileSizes = new long[countOfFiles];
long[] sumSize = new long[countOfFiles];
for (int i = countOfFiles - 1; i >= 0; --i) {
- StoreFile file = candidates.getFilesToCompact().get(i);
+ StoreFile file = candidates.get(i);
fileSizes[i] = file.getReader().length();
// calculate the sum of fileSizes[i,i+maxFilesToCompact-1) for algo
int tooFar = i + comConf.getMaxFilesToCompact() - 1;
@@ -309,8 +304,9 @@ public class DefaultCompactionPolicy ext
+ " files from " + countOfFiles + " candidates");
}
- candidates = candidates.getSubList(start, countOfFiles);
-
+ if (start > 0) {
+ candidates.subList(0, start).clear();
+ }
return candidates;
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1450407&r1=1450406&r2=1450407&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Tue Feb 26 20:51:59 2013
@@ -594,7 +594,7 @@ public class TestCompaction extends HBas
HStore store = (HStore) r.getStore(COLUMN_FAMILY);
Collection<StoreFile> storeFiles = store.getStorefiles();
- Compactor tool = store.compactor;
+ Compactor tool = store.storeEngine.getCompactor();
List<Path> newFiles = tool.compactForTesting(storeFiles, false);
@@ -611,7 +611,7 @@ public class TestCompaction extends HBas
stream.close();
try {
- store.completeCompaction(storeFiles, origPath);
+ ((HStore)store).moveFileIntoPlace(origPath);
} catch (Exception e) {
// The complete compaction should fail and the corrupt file should remain
// in the 'tmp' directory;
@@ -635,7 +635,7 @@ public class TestCompaction extends HBas
}
store.triggerMajorCompaction();
- CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null);
+ CompactionRequest request = store.requestCompaction(Store.NO_PRIORITY, null).getRequest();
assertNotNull("Expected to receive a compaction request", request);
assertEquals(
"System-requested major compaction should not occur if there are too many store files",
@@ -653,7 +653,7 @@ public class TestCompaction extends HBas
createStoreFile(r);
}
store.triggerMajorCompaction();
- CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null);
+ CompactionRequest request = store.requestCompaction(Store.PRIORITY_USER, null).getRequest();
assertNotNull("Expected to receive a compaction request", request);
assertEquals(
"User-requested major compaction should always occur, even if there are too many store files",
@@ -680,7 +680,7 @@ public class TestCompaction extends HBas
}
CountDownLatch latch = new CountDownLatch(1);
- TrackableCompactionRequest request = new TrackableCompactionRequest(r, (HStore) store, latch);
+ TrackableCompactionRequest request = new TrackableCompactionRequest(latch);
thread.requestCompaction(r, store, "test custom comapction", Store.PRIORITY_USER, request);
// wait for the latch to complete.
latch.await();
@@ -698,16 +698,15 @@ public class TestCompaction extends HBas
* Constructor for a custom compaction. Uses the setXXX methods to update the state of the
* compaction before being used.
*/
- public TrackableCompactionRequest(HRegion region, HStore store, CountDownLatch finished) {
- super(region, store, Store.PRIORITY_USER);
+ public TrackableCompactionRequest(CountDownLatch finished) {
+ super();
this.done = finished;
}
@Override
- public void run() {
- super.run();
+ public void afterExecute() {
+ super.afterExecute();
this.done.countDown();
}
}
-
}