You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/10/29 01:37:30 UTC
svn commit: r1536569 [1/2] - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/compactions/
test/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase...
Author: sershe
Date: Tue Oct 29 00:37:30 2013
New Revision: 1536569
URL: http://svn.apache.org/r1536569
Log:
HBASE-7680 implement compaction policy for stripe compactions
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreEngine.java Tue Oct 29 00:37:30 2013
@@ -54,6 +54,12 @@ public class DefaultStoreEngine extends
DEFAULT_COMPACTION_POLICY_CLASS = ExploringCompactionPolicy.class;
@Override
+ public boolean needsCompaction(List<StoreFile> filesCompacting) {
+ return compactionPolicy.needsCompaction(
+ this.storeFileManager.getStorefiles(), filesCompacting);
+ }
+
+ @Override
protected void createComponents(
Configuration conf, Store store, KVComparator kvComparator) throws IOException {
storeFileManager = new DefaultStoreFileManager(kvComparator, conf);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Tue Oct 29 00:37:30 2013
@@ -71,6 +71,8 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.OffPeakHours;
import org.apache.hadoop.hbase.regionserver.wal.HLogUtil;
import org.apache.hadoop.hbase.util.Bytes;
@@ -1194,8 +1196,8 @@ public class HStore implements Store {
try {
// Ready to go. Have list of files to compact.
- List<Path> newFiles =
- this.storeEngine.getCompactor().compactForTesting(filesToCompact, isMajor);
+ List<Path> newFiles = ((DefaultCompactor)this.storeEngine.getCompactor())
+ .compactForTesting(filesToCompact, isMajor);
for (Path newFile: newFiles) {
// Move the compaction into place.
StoreFile sf = moveFileIntoPlace(newFile);
@@ -1881,8 +1883,7 @@ public class HStore implements Store {
@Override
public boolean needsCompaction() {
- return storeEngine.getCompactionPolicy().needsCompaction(
- this.storeEngine.getStoreFileManager().getStorefiles(), filesCompacting);
+ return this.storeEngine.needsCompaction(this.filesCompacting);
}
@Override
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreConfigInformation.java Tue Oct 29 00:37:30 2013
@@ -31,7 +31,6 @@ import org.apache.hadoop.classification.
@InterfaceStability.Unstable
public interface StoreConfigInformation {
/**
- * TODO: remove after HBASE-7252 is fixed.
* @return Gets the Memstore flush size for the region that this store works with.
*/
long getMemstoreFlushSize();
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreEngine.java Tue Oct 29 00:37:30 2013
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.io.IOException;
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -80,6 +81,12 @@ public abstract class StoreEngine<SF ext
}
/**
+ * @param filesCompacting Files currently compacting
+ * @return whether a compaction selection is possible
+ */
+ public abstract boolean needsCompaction(List<StoreFile> filesCompacting);
+
+ /**
* Creates an instance of a compaction context specific to this engine.
* Doesn't actually select or start a compaction. See CompactionContext class comment.
* @return New CompactionContext object.
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Oct 29 00:37:30 2013
@@ -285,7 +285,7 @@ public class StoreFile {
return modificationTimeStamp;
}
- byte[] getMetadataValue(byte[] key) {
+ public byte[] getMetadataValue(byte[] key) {
return metadataMap.get(key);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java Tue Oct 29 00:37:30 2013
@@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -28,25 +30,116 @@ import org.apache.hadoop.conf.Configurat
*/
@InterfaceAudience.Private
public class StripeStoreConfig {
- public static final String MAX_SPLIT_IMBALANCE = "hbase.store.stripe.split.max.imbalance";
- private float maxSplitImbalance;
+ static final Log LOG = LogFactory.getLog(StripeStoreConfig.class);
- public StripeStoreConfig(Configuration config) {
- maxSplitImbalance = config.getFloat(MAX_SPLIT_IMBALANCE, 1.5f);
- if (maxSplitImbalance == 0) {
- maxSplitImbalance = 1.5f;
+ /** The maximum number of files to compact within a stripe; same as for regular compaction. */
+ public static final String MAX_FILES_KEY = "hbase.store.stripe.compaction.maxFiles";
+ /** The minimum number of files to compact within a stripe; same as for regular compaction. */
+ public static final String MIN_FILES_KEY = "hbase.store.stripe.compaction.minFiles";
+
+ /** The minimum number of files to compact when compacting L0; same as minFiles for regular
+ * compaction. Given that L0 causes unnecessary overwriting of the data, should be higher than
+ * regular minFiles. */
+ public static final String MIN_FILES_L0_KEY = "hbase.store.stripe.compaction.minFilesL0";
+
+ /** The size the stripe should achieve to be considered for splitting into multiple stripes.
+ Stripe will be split when it can be fully compacted, and it is above this size. */
+ public static final String SIZE_TO_SPLIT_KEY = "hbase.store.stripe.sizeToSplit";
+ /** The target count of new stripes to produce when splitting a stripe. A floating point
+ number, default is 2. Values less than 1 will be converted to 1/x. Non-whole numbers will
+ produce unbalanced splits, which may be good for some cases. In this case the "smaller" of
+ the new stripes will always be the rightmost one. If the stripe is bigger than sizeToSplit
+ when splitting, this will be adjusted by a whole increment. */
+ public static final String SPLIT_PARTS_KEY = "hbase.store.stripe.splitPartCount";
+ /** The initial stripe count to create. If the row distribution is roughly the same over time,
+ it's good to set this to a count of stripes that is expected to be achieved in most regions,
+ to get this count from the outset and prevent unnecessary splitting. */
+ public static final String INITIAL_STRIPE_COUNT_KEY = "hbase.store.stripe.initialStripeCount";
+
+ /** When splitting region, the maximum size imbalance to allow in an attempt to split at a
+ stripe boundary, so that no files go to both regions. Most users won't need to change that. */
+ public static final String MAX_REGION_SPLIT_IMBALANCE_KEY =
+ "hbase.store.stripe.region.split.max.imbalance";
+
+
+ private final float maxRegionSplitImbalance;
+ private final int level0CompactMinFiles;
+ private final int stripeCompactMinFiles;
+ private final int stripeCompactMaxFiles;
+
+ private final int initialCount;
+ private final long sizeToSplitAt;
+ private final float splitPartCount;
+ private final long splitPartSize; // derived from sizeToSplitAt and splitPartCount
+
+ private final double EPSILON = 0.001; // good enough for this, not a real epsilon.
+ public StripeStoreConfig(Configuration config, StoreConfigInformation sci) {
+ this.level0CompactMinFiles = config.getInt(MIN_FILES_L0_KEY, 4);
+ this.stripeCompactMinFiles = config.getInt(MIN_FILES_KEY, 3);
+ this.stripeCompactMaxFiles = config.getInt(MAX_FILES_KEY, 10);
+ this.maxRegionSplitImbalance = getFloat(config, MAX_REGION_SPLIT_IMBALANCE_KEY, 1.5f, true);
+
+ this.splitPartCount = getFloat(config, SPLIT_PARTS_KEY, 2.0f, true);
+ if (Math.abs(splitPartCount - 1.0) < EPSILON) {
+ throw new RuntimeException("Split part count cannot be 1: " + this.splitPartCount);
+ }
+ // TODO: change when no L0.
+ // Arbitrary default split size - 4 times the size of one L0 compaction.
+ double flushSize = sci.getMemstoreFlushSize();
+ if (flushSize == 0) {
+ flushSize = 128 * 1024 * 1024;
}
- if (maxSplitImbalance < 1f) {
- maxSplitImbalance = 1f / maxSplitImbalance;
+ long defaultSplitSize = (long)(flushSize * getLevel0MinFiles() * 4 * splitPartCount);
+ this.sizeToSplitAt = config.getLong(SIZE_TO_SPLIT_KEY, defaultSplitSize);
+ this.initialCount = config.getInt(INITIAL_STRIPE_COUNT_KEY, 1);
+ this.splitPartSize = (long)(this.sizeToSplitAt / this.splitPartCount);
+ }
+
+ private static float getFloat(
+ Configuration config, String key, float defaultValue, boolean moreThanOne) {
+ float value = config.getFloat(key, defaultValue);
+ if (value == 0) {
+ LOG.warn(String.format("%s is set to 0; using default value of %f", key, defaultValue));
+ value = defaultValue;
+ } else if ((value > 1f) != moreThanOne) {
+ value = 1f / value;
}
+ return value;
+ }
+
+ public float getMaxSplitImbalance() {
+ return this.maxRegionSplitImbalance;
+ }
+
+ public int getLevel0MinFiles() {
+ return level0CompactMinFiles;
+ }
+
+ public int getStripeCompactMinFiles() {
+ return stripeCompactMinFiles;
+ }
+
+ public int getStripeCompactMaxFiles() {
+ return stripeCompactMaxFiles;
+ }
+
+ public long getSplitSize() {
+ return sizeToSplitAt;
+ }
+
+ public int getInitialCount() {
+ return initialCount;
+ }
+
+ public float getSplitCount() {
+ return splitPartCount;
}
/**
- * @return the maximum imbalance to tolerate between sides when splitting the region
- * at the stripe boundary. If the ratio of a larger to a smaller side of the split on
- * the stripe-boundary is bigger than this, then some stripe will be split.
+ * @return the desired size of the target stripe when splitting, in bytes.
+ * Derived from {@link #getSplitSize()} and {@link #getSplitCount()}.
*/
- public float getMaxSplitImbalance() {
- return this.maxSplitImbalance;
+ public long getSplitPartSize() {
+ return splitPartSize;
}
}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java?rev=1536569&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java Tue Oct 29 00:37:30 2013
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
+import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * The storage engine that implements the stripe-based store/compaction scheme.
+ */
+@InterfaceAudience.Private
+public class StripeStoreEngine extends StoreEngine<DefaultStoreFlusher,
+ StripeCompactionPolicy, StripeCompactor, StripeStoreFileManager> {
+ static final Log LOG = LogFactory.getLog(StripeStoreEngine.class);
+ private StripeStoreConfig config;
+
+ @Override
+ public boolean needsCompaction(List<StoreFile> filesCompacting) {
+ return this.compactionPolicy.needsCompactions(this.storeFileManager, filesCompacting);
+ }
+
+ @Override
+ public CompactionContext createCompaction() {
+ return new StripeCompaction();
+ }
+
+ @Override
+ protected void createComponents(
+ Configuration conf, Store store, KVComparator comparator) throws IOException {
+ this.config = new StripeStoreConfig(conf, store);
+ this.compactionPolicy = new StripeCompactionPolicy(conf, store, config);
+ this.storeFlusher = new DefaultStoreFlusher(conf, store);
+ this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config);
+ this.compactor = new StripeCompactor(conf, store);
+ }
+
+ /**
+ * Represents one instance of stripe compaction, with the necessary context and flow.
+ */
+ private class StripeCompaction extends CompactionContext {
+ private StripeCompactionPolicy.StripeCompactionRequest stripeRequest = null;
+
+ @Override
+ public List<StoreFile> preSelect(List<StoreFile> filesCompacting) {
+ return compactionPolicy.preSelectFilesForCoprocessor(storeFileManager, filesCompacting);
+ }
+
+ @Override
+ public boolean select(List<StoreFile> filesCompacting, boolean isUserCompaction,
+ boolean mayUseOffPeak, boolean forceMajor) throws IOException {
+ this.stripeRequest = compactionPolicy.selectCompaction(
+ storeFileManager, filesCompacting, mayUseOffPeak);
+ this.request = (this.stripeRequest == null)
+ ? new CompactionRequest(new ArrayList<StoreFile>()) : this.stripeRequest.getRequest();
+ return this.stripeRequest != null;
+ }
+
+ @Override
+ public void forceSelect(CompactionRequest request) {
+ super.forceSelect(request);
+ if (this.stripeRequest != null) {
+ this.stripeRequest.setRequest(this.request);
+ } else {
+ LOG.warn("Stripe store is forced to take an arbitrary file list and compact it.");
+ this.stripeRequest = compactionPolicy.createEmptyRequest(storeFileManager, this.request);
+ }
+ }
+
+ @Override
+ public List<Path> compact() throws IOException {
+ Preconditions.checkArgument(this.stripeRequest != null, "Cannot compact without selection");
+ return this.stripeRequest.execute(compactor);
+ }
+ }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java Tue Oct 29 00:37:30 2013
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConcatenatedLists;
@@ -58,7 +59,8 @@ import com.google.common.collect.Immutab
* - Compaction has one contiguous set of stripes both in and out, except if L0 is involved.
*/
@InterfaceAudience.Private
-class StripeStoreFileManager implements StoreFileManager {
+public class StripeStoreFileManager
+ implements StoreFileManager, StripeCompactionPolicy.StripeInformationProvider {
static final Log LOG = LogFactory.getLog(StripeStoreFileManager.class);
/**
@@ -81,16 +83,16 @@ class StripeStoreFileManager implements
*/
private static class State {
/**
- * The end keys of each stripe. The last stripe end is always open-ended, so it's not stored
- * here. It is invariant that the start key of the stripe is the end key of the previous one
+ * The end rows of each stripe. The last stripe end is always open-ended, so it's not stored
+ * here. It is invariant that the start row of the stripe is the end row of the previous one
* (and is an open boundary for the first one).
*/
public byte[][] stripeEndRows = new byte[0][];
/**
- * Files by stripe. Each element of the list corresponds to stripeEndKey with the corresponding
- * index, except the last one. Inside each list, the files are in reverse order by seqNum.
- * Note that the length of this is one higher than that of stripeEndKeys.
+ * Files by stripe. Each element of the list corresponds to stripeEndRow element with the
+ * same index, except the last one. Inside each list, the files are in reverse order by
+ * seqNum. Note that the length of this is one higher than that of stripeEndKeys.
*/
public ArrayList<ImmutableList<StoreFile>> stripeFiles
= new ArrayList<ImmutableList<StoreFile>>();
@@ -105,16 +107,20 @@ class StripeStoreFileManager implements
/** Cached file metadata (or overrides as the case may be) */
private HashMap<StoreFile, byte[]> fileStarts = new HashMap<StoreFile, byte[]>();
private HashMap<StoreFile, byte[]> fileEnds = new HashMap<StoreFile, byte[]>();
+ /** Normally invalid key is null, but in the map null is the result for "no key"; so use
+ * the following constant value in these maps instead. Note that this is a constant and
+ * we use it to compare by reference when we read from the map. */
+ private static final byte[] INVALID_KEY_IN_MAP = new byte[0];
private final KVComparator kvComparator;
private StripeStoreConfig config;
private final int blockingFileCount;
- public StripeStoreFileManager(KVComparator kvComparator, Configuration conf) throws Exception {
+ public StripeStoreFileManager(
+ KVComparator kvComparator, Configuration conf, StripeStoreConfig config) {
this.kvComparator = kvComparator;
- // TODO: create this in a shared manner in StoreEngine when there's one
- this.config = new StripeStoreConfig(conf);
+ this.config = config;
this.blockingFileCount = conf.getInt(
HStore.BLOCKING_STOREFILES_KEY, HStore.DEFAULT_BLOCKING_STOREFILE_COUNT);
}
@@ -378,7 +384,7 @@ class StripeStoreFileManager implements
while (entryIter.hasNext()) {
Map.Entry<byte[], ArrayList<StoreFile>> entry = entryIter.next();
ArrayList<StoreFile> files = entry.getValue();
- // Validate the file start keys, and remove the bad ones to level 0.
+ // Validate the file start rows, and remove the bad ones to level 0.
for (int i = 0; i < files.size(); ++i) {
StoreFile sf = files.get(i);
byte[] startRow = startOf(sf);
@@ -459,15 +465,8 @@ class StripeStoreFileManager implements
}
private void ensureLevel0Metadata(StoreFile sf) {
- if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, null);
- if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, null);
- }
-
- /**
- * For testing.
- */
- List<StoreFile> getLevel0Files() {
- return state.level0Files;
+ if (!isInvalid(startOf(sf))) this.fileStarts.put(sf, INVALID_KEY_IN_MAP);
+ if (!isInvalid(endOf(sf))) this.fileEnds.put(sf, INVALID_KEY_IN_MAP);
}
private void debugDumpState(String string) {
@@ -515,7 +514,7 @@ class StripeStoreFileManager implements
}
/**
- * Finds the stripe index by end key.
+ * Finds the stripe index by end row.
*/
private final int findStripeIndexByEndRow(byte[] endRow) {
assert !isInvalid(endRow);
@@ -524,7 +523,7 @@ class StripeStoreFileManager implements
}
/**
- * Finds the stripe index for the stripe containing a key provided externally for get/scan.
+ * Finds the stripe index for the stripe containing a row provided externally for get/scan.
*/
private final int findStripeForRow(byte[] row, boolean isStart) {
if (isStart && row == HConstants.EMPTY_START_ROW) return 0;
@@ -537,33 +536,28 @@ class StripeStoreFileManager implements
return Math.abs(Arrays.binarySearch(state.stripeEndRows, row, Bytes.BYTES_COMPARATOR) + 1);
}
- /**
- * Gets the start key for a given stripe.
- * @param stripeIndex Stripe index.
- * @return Start key. May be an open key.
- */
+ @Override
public final byte[] getStartRow(int stripeIndex) {
return (stripeIndex == 0 ? OPEN_KEY : state.stripeEndRows[stripeIndex - 1]);
}
- /**
- * Gets the start key for a given stripe.
- * @param stripeIndex Stripe index.
- * @return Start key. May be an open key.
- */
+ @Override
public final byte[] getEndRow(int stripeIndex) {
return (stripeIndex == state.stripeEndRows.length
? OPEN_KEY : state.stripeEndRows[stripeIndex]);
}
+
private byte[] startOf(StoreFile sf) {
byte[] result = this.fileStarts.get(sf);
- return result != null ? result : sf.getMetadataValue(STRIPE_START_KEY);
+ return result == null ? sf.getMetadataValue(STRIPE_START_KEY)
+ : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
}
private byte[] endOf(StoreFile sf) {
byte[] result = this.fileEnds.get(sf);
- return result != null ? result : sf.getMetadataValue(STRIPE_END_KEY);
+ return result == null ? sf.getMetadataValue(STRIPE_END_KEY)
+ : (result == INVALID_KEY_IN_MAP ? INVALID_KEY : result);
}
/**
@@ -793,8 +787,8 @@ class StripeStoreFileManager implements
/**
* See {@link #addCompactionResults(Collection, Collection)} - updates the stripe list with
- * new candidate stripes/removes old stripes; produces new set of stripe end keys.
- * @param newStripes New stripes - files by end key.
+ * new candidate stripes/removes old stripes; produces new set of stripe end rows.
+ * @param newStripes New stripes - files by end row.
*/
private void processNewCandidateStripes(
TreeMap<byte[], StoreFile> newStripes) throws IOException {
@@ -859,4 +853,31 @@ class StripeStoreFileManager implements
}
}
}
+
+ @Override
+ public List<StoreFile> getLevel0Files() {
+ return this.state.level0Files;
+ }
+
+ @Override
+ public List<byte[]> getStripeBoundaries() {
+ if (this.state.stripeFiles.isEmpty()) return new ArrayList<byte[]>();
+ ArrayList<byte[]> result = new ArrayList<byte[]>(this.state.stripeEndRows.length + 2);
+ result.add(OPEN_KEY);
+ for (int i = 0; i < this.state.stripeEndRows.length; ++i) {
+ result.add(this.state.stripeEndRows[i]);
+ }
+ result.add(OPEN_KEY);
+ return result;
+ }
+
+ @Override
+ public ArrayList<ImmutableList<StoreFile>> getStripes() {
+ return this.state.stripeFiles;
+ }
+
+ @Override
+ public int getStripeCount() {
+ return this.state.stripeFiles.size();
+ }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionConfiguration.java Tue Oct 29 00:37:30 2013
@@ -47,6 +47,7 @@ public class CompactionConfiguration {
static final Log LOG = LogFactory.getLog(CompactionConfiguration.class);
private static final String CONFIG_PREFIX = "hbase.hstore.compaction.";
+ public static final String RATIO_KEY = CONFIG_PREFIX + "ratio";
Configuration conf;
StoreConfigInformation storeConfigInfo;
@@ -72,7 +73,7 @@ public class CompactionConfiguration {
minFilesToCompact = Math.max(2, conf.getInt(CONFIG_PREFIX + "min",
/*old name*/ conf.getInt("hbase.hstore.compactionThreshold", 3)));
maxFilesToCompact = conf.getInt(CONFIG_PREFIX + "max", 10);
- compactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio", 1.2F);
+ compactionRatio = conf.getFloat(RATIO_KEY, 1.2F);
offPeekCompactionRatio = conf.getFloat(CONFIG_PREFIX + "ratio.offpeak", 5.0F);
throttlePoint = conf.getLong("hbase.regionserver.thread.compaction.throttle",
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/CompactionPolicy.java Tue Oct 29 00:37:30 2013
@@ -56,14 +56,6 @@ public abstract class CompactionPolicy {
public abstract boolean throttleCompaction(long compactionSize);
/**
- * @param storeFiles Current store files.
- * @param filesCompacting files currently compacting.
- * @return whether a compactionSelection is possible
- */
- public abstract boolean needsCompaction(final Collection<StoreFile> storeFiles,
- final List<StoreFile> filesCompacting);
-
- /**
* Inform the policy that some configuration has been change,
* so cached value should be updated it any.
*/
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/Compactor.java Tue Oct 29 00:37:30 2013
@@ -79,32 +79,6 @@ public abstract class Compactor {
void append(KeyValue kv) throws IOException;
}
- /**
- * Do a minor/major compaction on an explicit set of storefiles from a Store.
- * @param request the requested compaction
- * @return Product of compaction or an empty list if all cells expired or deleted and nothing made
- * it through the compaction.
- * @throws IOException
- */
- public abstract List<Path> compact(final CompactionRequest request) throws IOException;
-
- /**
- * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
- * {@link #compact(CompactionRequest)};
- * @param filesToCompact the files to compact. These are used as the compactionSelection for the
- * generated {@link CompactionRequest}.
- * @param isMajor true to major compact (prune all deletes, max versions, etc)
- * @return Product of compaction or an empty list if all cells expired or deleted and nothing made
- * it through the compaction.
- * @throws IOException
- */
- public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
- throws IOException {
- CompactionRequest cr = new CompactionRequest(filesToCompact);
- cr.setIsMajor(isMajor);
- return this.compact(cr);
- }
-
public CompactionProgress getProgress() {
return this.progress;
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/DefaultCompactor.java Tue Oct 29 00:37:30 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionse
import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -94,4 +95,21 @@ public class DefaultCompactor extends Co
}
return newFiles;
}
+
+ /**
+ * Compact a list of files for testing. Creates a fake {@link CompactionRequest} to pass to
+ * {@link #compact(CompactionRequest)};
+ * @param filesToCompact the files to compact. These are used as the compactionSelection for
+ * the generated {@link CompactionRequest}.
+ * @param isMajor true to major compact (prune all deletes, max versions, etc)
+ * @return Product of compaction or an empty list if all cells expired or deleted and nothing \
+ * made it through the compaction.
+ * @throws IOException
+ */
+ public List<Path> compactForTesting(final Collection<StoreFile> filesToCompact, boolean isMajor)
+ throws IOException {
+ CompactionRequest cr = new CompactionRequest(filesToCompact);
+ cr.setIsMajor(isMajor);
+ return this.compact(cr);
+ }
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/ExploringCompactionPolicy.java Tue Oct 29 00:37:30 2013
@@ -53,9 +53,19 @@ public class ExploringCompactionPolicy e
@Override
final ArrayList<StoreFile> applyCompactionPolicy(final ArrayList<StoreFile> candidates,
final boolean mayUseOffPeak, final boolean mightBeStuck) throws IOException {
+ return new ArrayList<StoreFile>(applyCompactionPolicy(candidates, mightBeStuck,
+ mayUseOffPeak, comConf.getMinFilesToCompact(), comConf.getMaxFilesToCompact()));
+ }
+
+ public List<StoreFile> applyCompactionPolicy(final List<StoreFile> candidates,
+ boolean mightBeStuck, boolean mayUseOffPeak, int minFiles, int maxFiles) {
+
+ final double currentRatio = mayUseOffPeak
+ ? comConf.getCompactionRatioOffPeak() : comConf.getCompactionRatio();
+
// Start off choosing nothing.
List<StoreFile> bestSelection = new ArrayList<StoreFile>(0);
- List<StoreFile> smallest = new ArrayList<StoreFile>(0);
+ List<StoreFile> smallest = mightBeStuck ? new ArrayList<StoreFile>(0) : null;
long bestSize = 0;
long smallestSize = Long.MAX_VALUE;
@@ -63,15 +73,15 @@ public class ExploringCompactionPolicy e
// Consider every starting place.
for (int start = 0; start < candidates.size(); start++) {
// Consider every different sub list permutation in between start and end with min files.
- for (int currentEnd = start + comConf.getMinFilesToCompact() - 1;
+ for (int currentEnd = start + minFiles - 1;
currentEnd < candidates.size(); currentEnd++) {
List<StoreFile> potentialMatchFiles = candidates.subList(start, currentEnd + 1);
// Sanity checks
- if (potentialMatchFiles.size() < comConf.getMinFilesToCompact()) {
+ if (potentialMatchFiles.size() < minFiles) {
continue;
}
- if (potentialMatchFiles.size() > comConf.getMaxFilesToCompact()) {
+ if (potentialMatchFiles.size() > maxFiles) {
continue;
}
@@ -81,7 +91,7 @@ public class ExploringCompactionPolicy e
// Store the smallest set of files. This stored set of files will be used
// if it looks like the algorithm is stuck.
- if (size < smallestSize) {
+ if (mightBeStuck && size < smallestSize) {
smallest = potentialMatchFiles;
smallestSize = size;
}
@@ -92,7 +102,7 @@ public class ExploringCompactionPolicy e
++opts;
if (size >= comConf.getMinCompactSize()
- && !filesInRatio(potentialMatchFiles, mayUseOffPeak)) {
+ && !filesInRatio(potentialMatchFiles, currentRatio)) {
continue;
}
@@ -150,15 +160,13 @@ public class ExploringCompactionPolicy e
* FileSize(i) <= ( Sum(0,N,FileSize(_)) - FileSize(i) ) * Ratio.
*
* @param files List of store files to consider as a compaction candidate.
- * @param isOffPeak should the offPeak compaction ratio be used ?
+ * @param currentRatio The ratio to use.
* @return a boolean if these files satisfy the ratio constraints.
*/
- private boolean filesInRatio(final List<StoreFile> files, final boolean isOffPeak) {
+ private boolean filesInRatio(final List<StoreFile> files, final double currentRatio) {
if (files.size() < 2) {
- return true;
+ return true;
}
- final double currentRatio =
- isOffPeak ? comConf.getCompactionRatioOffPeak() : comConf.getCompactionRatio();
long totalFileSize = getTotalStoreSize(files);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/RatioBasedCompactionPolicy.java Tue Oct 29 00:37:30 2013
@@ -388,7 +388,6 @@ public class RatioBasedCompactionPolicy
return compactionSize > comConf.getThrottlePoint();
}
- @Override
public boolean needsCompaction(final Collection<StoreFile> storeFiles,
final List<StoreFile> filesCompacting) {
int numCandidates = storeFiles.size() - filesCompacting.size();
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java?rev=1536569&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java Tue Oct 29 00:37:30 2013
@@ -0,0 +1,563 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StoreUtils;
+import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.ConcatenatedLists;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.collect.ImmutableList;
+
+/**
+ * Stripe store implementation of compaction policy.
+ */
+@InterfaceAudience.Private
+public class StripeCompactionPolicy extends CompactionPolicy {
+ private final static Log LOG = LogFactory.getLog(StripeCompactionPolicy.class);
+ // Policy used to compact individual stripes.
+ private ExploringCompactionPolicy stripePolicy = null;
+
+ private StripeStoreConfig config;
+
+ public StripeCompactionPolicy(
+ Configuration conf, StoreConfigInformation storeConfigInfo, StripeStoreConfig config) {
+ super(conf, storeConfigInfo);
+ this.config = config;
+ stripePolicy = new ExploringCompactionPolicy(conf, storeConfigInfo);
+ }
+
+ public List<StoreFile> preSelectFilesForCoprocessor(StripeInformationProvider si,
+ List<StoreFile> filesCompacting) {
+ // We sincerely hope nobody is messing with us with their coprocessors.
+ // If they do, they are very likely to shoot themselves in the foot.
+ // We'll just exclude all the filesCompacting from the list.
+ ArrayList<StoreFile> candidateFiles = new ArrayList<StoreFile>(si.getStorefiles());
+ candidateFiles.removeAll(filesCompacting);
+ return candidateFiles;
+ }
+
+ public StripeCompactionRequest createEmptyRequest(
+ StripeInformationProvider si, CompactionRequest request) {
+ // Treat as L0-ish compaction with fixed set of files, and hope for the best.
+ if (si.getStripeCount() > 0) {
+ return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries());
+ }
+ int initialCount = this.config.getInitialCount();
+ long targetKvs = estimateTargetKvs(request.getFiles(), initialCount).getFirst();
+ return new SplitStripeCompactionRequest(request, OPEN_KEY, OPEN_KEY, targetKvs);
+ }
+
+ public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
+ List<StoreFile> filesCompacting, boolean isOffpeak) throws IOException {
+ // TODO: first cut - no parallel compactions. To have more fine grained control we
+ // probably need structure more sophisticated than a list.
+ if (!filesCompacting.isEmpty()) {
+ LOG.debug("Not selecting compaction: " + filesCompacting.size() + " files compacting");
+ return null;
+ }
+
+ // We are going to do variations of compaction in strict order of preference.
+ // A better/more advanced approach is to use a heuristic to see which one is "more
+ // necessary" at current time.
+
+ // This can happen due to region split. We can skip it later; for now preserve
+ // compact-all-things behavior.
+ Collection<StoreFile> allFiles = si.getStorefiles();
+ if (StoreUtils.hasReferences(allFiles)) {
+ LOG.debug("There are references in the store; compacting all files");
+ long targetKvs = estimateTargetKvs(allFiles, config.getSplitCount()).getFirst();
+ SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
+ allFiles, OPEN_KEY, OPEN_KEY, targetKvs);
+ request.setMajorRangeFull();
+ return request;
+ }
+
+ int stripeCount = si.getStripeCount();
+ List<StoreFile> l0Files = si.getLevel0Files();
+
+ // See if we need to make new stripes.
+ boolean shouldCompactL0 = (this.config.getLevel0MinFiles() <= l0Files.size());
+ if (stripeCount == 0) {
+ if (!shouldCompactL0) return null; // nothing to do.
+ return selectNewStripesCompaction(si);
+ }
+
+ boolean canDropDeletesNoL0 = l0Files.size() == 0;
+ if (shouldCompactL0) {
+ if (!canDropDeletesNoL0) {
+ // If we need to compact L0, see if we can add something to it, and drop deletes.
+ StripeCompactionRequest result = selectSingleStripeCompaction(
+ si, true, canDropDeletesNoL0, isOffpeak);
+ if (result != null) return result;
+ }
+ LOG.debug("Selecting L0 compaction with " + l0Files.size() + " files");
+ return new BoundaryStripeCompactionRequest(l0Files, si.getStripeBoundaries());
+ }
+
+ // Try to delete fully expired stripes
+ StripeCompactionRequest result = selectExpiredMergeCompaction(si, canDropDeletesNoL0);
+ if (result != null) return result;
+
+ // Ok, nothing special here, let's see if we need to do a common compaction.
+ // This will also split the stripes that are too big if needed.
+ return selectSingleStripeCompaction(si, false, canDropDeletesNoL0, isOffpeak);
+ }
+
+ public boolean needsCompactions(StripeInformationProvider si, List<StoreFile> filesCompacting) {
+ // Approximation on whether we need compaction.
+ return filesCompacting.isEmpty()
+ && (StoreUtils.hasReferences(si.getStorefiles())
+ || (si.getLevel0Files().size() >= this.config.getLevel0MinFiles())
+ || needsSingleStripeCompaction(si));
+ }
+
+ @Override
+ public boolean isMajorCompaction(Collection<StoreFile> filesToCompact) throws IOException {
+ return false; // there's never a major compaction!
+ }
+
+ @Override
+ public boolean throttleCompaction(long compactionSize) {
+ return compactionSize > comConf.getThrottlePoint();
+ }
+
+ /**
+ * @param si StoreFileManager.
+ * @return Whether any stripe potentially needs compaction.
+ */
+ protected boolean needsSingleStripeCompaction(StripeInformationProvider si) {
+ int minFiles = this.config.getStripeCompactMinFiles();
+ for (List<StoreFile> stripe : si.getStripes()) {
+ if (stripe.size() >= minFiles) return true;
+ }
+ return false;
+ }
+
+ protected StripeCompactionRequest selectSingleStripeCompaction(StripeInformationProvider si,
+ boolean includeL0, boolean canDropDeletesWithoutL0, boolean isOffpeak) throws IOException {
+ ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
+
+ int bqIndex = -1;
+ List<StoreFile> bqSelection = null;
+ int stripeCount = stripes.size();
+ long bqTotalSize = -1;
+ for (int i = 0; i < stripeCount; ++i) {
+ // If we want to compact L0 to drop deletes, we only want whole-stripe compactions.
+ // So, pass includeL0 as 2nd parameter to indicate that.
+ List<StoreFile> selection = selectSimpleCompaction(stripes.get(i),
+ !canDropDeletesWithoutL0 && includeL0, isOffpeak);
+ if (selection.isEmpty()) continue;
+ long size = 0;
+ for (StoreFile sf : selection) {
+ size += sf.getReader().length();
+ }
+ if (bqSelection == null || selection.size() > bqSelection.size() ||
+ (selection.size() == bqSelection.size() && size < bqTotalSize)) {
+ bqSelection = selection;
+ bqIndex = i;
+ bqTotalSize = size;
+ }
+ }
+ if (bqSelection == null) {
+ LOG.debug("No good compaction is possible in any stripe");
+ return null;
+ }
+ List<StoreFile> filesToCompact = new ArrayList<StoreFile>(bqSelection);
+ // See if we can, and need to, split this stripe.
+ int targetCount = 1;
+ long targetKvs = Long.MAX_VALUE;
+ boolean hasAllFiles = filesToCompact.size() == stripes.get(bqIndex).size();
+ String splitString = "";
+ if (hasAllFiles && bqTotalSize >= config.getSplitSize()) {
+ if (includeL0) {
+ // We want to avoid the scenario where we compact a stripe w/L0 and then split it.
+ // So, if we might split, don't compact the stripe with L0.
+ return null;
+ }
+ Pair<Long, Integer> kvsAndCount = estimateTargetKvs(filesToCompact, config.getSplitCount());
+ targetKvs = kvsAndCount.getFirst();
+ targetCount = kvsAndCount.getSecond();
+ splitString = "; the stripe will be split into at most "
+ + targetCount + " stripes with " + targetKvs + " target KVs";
+ }
+
+ LOG.debug("Found compaction in a stripe with end key ["
+ + Bytes.toString(si.getEndRow(bqIndex)) + "], with "
+ + filesToCompact.size() + " files of total size " + bqTotalSize + splitString);
+
+ // See if we can drop deletes.
+ StripeCompactionRequest req;
+ if (includeL0) {
+ assert hasAllFiles;
+ List<StoreFile> l0Files = si.getLevel0Files();
+ LOG.debug("Adding " + l0Files.size() + " files to compaction to be able to drop deletes");
+ ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
+ sfs.addSublist(filesToCompact);
+ sfs.addSublist(l0Files);
+ req = new BoundaryStripeCompactionRequest(sfs, si.getStripeBoundaries());
+ } else {
+ req = new SplitStripeCompactionRequest(
+ filesToCompact, si.getStartRow(bqIndex), si.getEndRow(bqIndex), targetCount, targetKvs);
+ }
+ if (canDropDeletesWithoutL0 || includeL0) {
+ req.setMajorRange(si.getStartRow(bqIndex), si.getEndRow(bqIndex));
+ }
+ req.getRequest().setOffPeak(isOffpeak);
+ return req;
+ }
+
+ /**
+ * Selects the compaction of a single stripe using default policy.
+ * @param sfs Files.
+ * @param allFilesOnly Whether a compaction of all-or-none files is needed.
+ * @return The resulting selection.
+ */
+ private List<StoreFile> selectSimpleCompaction(
+ List<StoreFile> sfs, boolean allFilesOnly, boolean isOffpeak) {
+ int minFilesLocal = Math.max(
+ allFilesOnly ? sfs.size() : 0, this.config.getStripeCompactMinFiles());
+ int maxFilesLocal = Math.max(this.config.getStripeCompactMaxFiles(), minFilesLocal);
+ return stripePolicy.applyCompactionPolicy(sfs, isOffpeak, false, minFilesLocal, maxFilesLocal);
+ }
+
+ /**
+ * Selects the compaction that compacts all files (to be removed later).
+ * @param si StoreFileManager.
+ * @param targetStripeCount Target stripe count.
+ * @param targetSize Target stripe size.
+ * @return The compaction.
+ */
+ private StripeCompactionRequest selectCompactionOfAllFiles(StripeInformationProvider si,
+ int targetStripeCount, long targetSize) {
+ Collection<StoreFile> allFiles = si.getStorefiles();
+ SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
+ allFiles, OPEN_KEY, OPEN_KEY, targetStripeCount, targetSize);
+ request.setMajorRangeFull();
+ LOG.debug("Selecting a compaction that includes all " + allFiles.size() + " files");
+ return request;
+ }
+
+ private StripeCompactionRequest selectNewStripesCompaction(StripeInformationProvider si) {
+ List<StoreFile> l0Files = si.getLevel0Files();
+ Pair<Long, Integer> kvsAndCount = estimateTargetKvs(l0Files, config.getInitialCount());
+ LOG.debug("Creating " + kvsAndCount.getSecond() + " initial stripes with "
+ + kvsAndCount.getFirst() + " kvs each via L0 compaction of " + l0Files.size() + " files");
+ SplitStripeCompactionRequest request = new SplitStripeCompactionRequest(
+ si.getLevel0Files(), OPEN_KEY, OPEN_KEY, kvsAndCount.getSecond(), kvsAndCount.getFirst());
+ request.setMajorRangeFull(); // L0 only, can drop deletes.
+ return request;
+ }
+
+ private StripeCompactionRequest selectExpiredMergeCompaction(
+ StripeInformationProvider si, boolean canDropDeletesNoL0) {
+ long cfTtl = this.storeConfigInfo.getStoreFileTtl();
+ if (cfTtl == Long.MAX_VALUE) {
+ return null; // minversion might be set, cannot delete old files
+ }
+ long timestampCutoff = EnvironmentEdgeManager.currentTimeMillis() - cfTtl;
+ // Merge the longest sequence of stripes where all files have expired, if any.
+ int start = -1, bestStart = -1, length = 0, bestLength = 0;
+ ArrayList<ImmutableList<StoreFile>> stripes = si.getStripes();
+ OUTER: for (int i = 0; i < stripes.size(); ++i) {
+ for (StoreFile storeFile : stripes.get(i)) {
+ if (storeFile.getReader().getMaxTimestamp() < timestampCutoff) continue;
+ // Found non-expired file, this stripe has to stay.
+ if (length > bestLength) {
+ bestStart = start;
+ bestLength = length;
+ }
+ start = -1;
+ length = 0;
+ continue OUTER;
+ }
+ if (start == -1) {
+ start = i;
+ }
+ ++length;
+ }
+ if (length > bestLength) {
+ bestStart = start;
+ bestLength = length;
+ }
+ if (bestLength == 0) return null;
+ if (bestLength == 1) {
+ // This is currently inefficient. If only one stripe expired, we will rewrite some
+ // entire stripe just to delete some expired files because we rely on metadata and it
+ // cannot simply be updated in an old file. When we either determine stripe dynamically
+ // or move metadata to manifest, we can just drop the "expired stripes".
+ if (bestStart == (stripes.size() - 1)) return null;
+ ++bestLength;
+ }
+ LOG.debug("Merging " + bestLength + " stripes to delete expired store files");
+ int endIndex = bestStart + bestLength - 1;
+ ConcatenatedLists<StoreFile> sfs = new ConcatenatedLists<StoreFile>();
+ sfs.addAllSublists(stripes.subList(bestStart, endIndex + 1));
+ SplitStripeCompactionRequest result = new SplitStripeCompactionRequest(sfs,
+ si.getStartRow(bestStart), si.getEndRow(endIndex), 1, Long.MAX_VALUE);
+ if (canDropDeletesNoL0) {
+ result.setMajorRangeFull();
+ }
+ return result;
+ }
+
+ private static long getTotalKvCount(final Collection<StoreFile> candidates) {
+ long totalSize = 0;
+ for (StoreFile storeFile : candidates) {
+ totalSize += storeFile.getReader().getEntries();
+ }
+ return totalSize;
+ }
+
+ private static long getTotalFileSize(final Collection<StoreFile> candidates) {
+ long totalSize = 0;
+ for (StoreFile storeFile : candidates) {
+ totalSize += storeFile.getReader().length();
+ }
+ return totalSize;
+ }
+
+ private Pair<Long, Integer> estimateTargetKvs(Collection<StoreFile> files, double splitCount) {
+ // If the size is larger than what we target, we don't want to split into proportionally
+ // larger parts and then have to split again very soon. So, we will increase the multiplier
+ // by one until we get small enough parts. E.g. 5Gb stripe that should have been split into
+ // 2 parts when it was 3Gb will be split into 3x1.67Gb parts, rather than 2x2.5Gb parts.
+ long totalSize = getTotalFileSize(files);
+ long targetPartSize = config.getSplitPartSize();
+ assert targetPartSize > 0 && splitCount > 0;
+ double ratio = totalSize / (splitCount * targetPartSize); // ratio of real to desired size
+ while (ratio > 1.0) {
+ // Ratio of real to desired size if we increase the multiplier.
+ double newRatio = totalSize / ((splitCount + 1.0) * targetPartSize);
+ if ((1.0 / newRatio) >= ratio) break; // New ratio is < 1.0, but further than the last one.
+ ratio = newRatio;
+ splitCount += 1.0;
+ }
+ long kvCount = (long)(getTotalKvCount(files) / splitCount);
+ return new Pair<Long, Integer>(kvCount, (int)Math.ceil(splitCount));
+ }
+
+ /** Stripe compaction request wrapper. */
+ public abstract static class StripeCompactionRequest {
+ protected CompactionRequest request;
+ protected byte[] majorRangeFromRow = null, majorRangeToRow = null;
+
+ /**
+ * Executes the request against compactor (essentially, just calls correct overload of
+ * compact method), to simulate more dynamic dispatch.
+ * @param compactor Compactor.
+ * @return result of compact(...)
+ */
+ public abstract List<Path> execute(StripeCompactor compactor);
+
+ public StripeCompactionRequest(CompactionRequest request) {
+ this.request = request;
+ }
+
+ /**
+ * Sets compaction "major range". Major range is the key range for which all
+ * the files are included, so they can be treated like major-compacted files.
+ * @param startRow Left boundary, inclusive.
+ * @param endRow Right boundary, exclusive.
+ */
+ public void setMajorRange(byte[] startRow, byte[] endRow) {
+ this.majorRangeFromRow = startRow;
+ this.majorRangeToRow = endRow;
+ }
+
+ public CompactionRequest getRequest() {
+ return this.request;
+ }
+
+ public void setRequest(CompactionRequest request) {
+ assert request != null;
+ this.request = request;
+ this.majorRangeFromRow = this.majorRangeToRow = null;
+ }
+ }
+
+ /**
+ * Request for stripe compactor that will cause it to split the source files into several
+ * separate files at the provided boundaries.
+ */
+ private static class BoundaryStripeCompactionRequest extends StripeCompactionRequest {
+ private final List<byte[]> targetBoundaries;
+
+ /**
+ * @param request Original request.
+ * @param targetBoundaries New files should be written with these boundaries.
+ */
+ public BoundaryStripeCompactionRequest(CompactionRequest request,
+ List<byte[]> targetBoundaries) {
+ super(request);
+ this.targetBoundaries = targetBoundaries;
+ }
+
+ public BoundaryStripeCompactionRequest(Collection<StoreFile> files,
+ List<byte[]> targetBoundaries) {
+ this(new CompactionRequest(files), targetBoundaries);
+ }
+
+ @Override
+ public List<Path> execute(StripeCompactor compactor) {
+ return compactor.compact(
+ this.request, this.targetBoundaries, this.majorRangeFromRow, this.majorRangeToRow);
+ }
+ }
+
+ /**
+ * Request for stripe compactor that will cause it to split the source files into several
+ * separate files into based on key-value count, as well as file count limit.
+ * Most of the files will be roughly the same size. The last file may be smaller or larger
+ * depending on the interplay of the amount of data and maximum number of files allowed.
+ */
+ private static class SplitStripeCompactionRequest extends StripeCompactionRequest {
+ private final byte[] startRow, endRow;
+ private final int targetCount;
+ private final long targetKvs;
+
+ /**
+ * @param request Original request.
+ * @param startRow Left boundary of the range to compact, inclusive.
+ * @param endRow Right boundary of the range to compact, exclusive.
+ * @param targetCount The maximum number of stripe to compact into.
+ * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
+ * total number of kvs, all the overflow data goes into the last stripe.
+ */
+ public SplitStripeCompactionRequest(CompactionRequest request,
+ byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
+ super(request);
+ this.startRow = startRow;
+ this.endRow = endRow;
+ this.targetCount = targetCount;
+ this.targetKvs = targetKvs;
+ }
+
+ public SplitStripeCompactionRequest(
+ CompactionRequest request, byte[] startRow, byte[] endRow, long targetKvs) {
+ this(request, startRow, endRow, Integer.MAX_VALUE, targetKvs);
+ }
+
+ public SplitStripeCompactionRequest(
+ Collection<StoreFile> files, byte[] startRow, byte[] endRow, long targetKvs) {
+ this(files, startRow, endRow, Integer.MAX_VALUE, targetKvs);
+ }
+
+ public SplitStripeCompactionRequest(Collection<StoreFile> files,
+ byte[] startRow, byte[] endRow, int targetCount, long targetKvs) {
+ this(new CompactionRequest(files), startRow, endRow, targetCount, targetKvs);
+ }
+
+ @Override
+ public List<Path> execute(StripeCompactor compactor) {
+ return compactor.compact(this.request, this.targetCount, this.targetKvs,
+ this.startRow, this.endRow, this.majorRangeFromRow, this.majorRangeToRow);
+ }
+
+ /** Set major range of the compaction to the entire compaction range.
+ * See {@link #setMajorRange(byte[], byte[])}. */
+ public void setMajorRangeFull() {
+ setMajorRange(this.startRow, this.endRow);
+ }
+ }
+
+ /** Helper class used to calculate size related things */
+ private static class StripeSizes {
+ public final ArrayList<Long> kvCounts;
+ public final ArrayList<Long> fileSizes;
+ public double avgKvCount = 0;
+ public long minKvCount = Long.MAX_VALUE, maxKvCount = Long.MIN_VALUE;
+ public int minIndex = -1, maxIndex = -1;
+
+ public StripeSizes(List<ImmutableList<StoreFile>> stripes) {
+ assert !stripes.isEmpty();
+ kvCounts = new ArrayList<Long>(stripes.size());
+ fileSizes = new ArrayList<Long>(stripes.size());
+ for (int i = 0; i < stripes.size(); ++i) {
+ long kvCount = getTotalKvCount(stripes.get(i));
+ fileSizes.add(getTotalFileSize(stripes.get(i)));
+ kvCounts.add(kvCount);
+ avgKvCount += (double)(kvCount - avgKvCount) / (i + 1);
+ if (minKvCount > kvCount) {
+ minIndex = i;
+ minKvCount = kvCount;
+ }
+ if (maxKvCount < kvCount) {
+ maxIndex = i;
+ maxKvCount = kvCount;
+ }
+ }
+ }
+ }
+
+ /** The information about stripes that the policy needs to do its stuff */
+ public static interface StripeInformationProvider {
+ public Collection<StoreFile> getStorefiles();
+
+ /**
+ * Gets the start row for a given stripe.
+ * @param stripeIndex Stripe index.
+ * @return Start row. May be an open key.
+ */
+ public byte[] getStartRow(int stripeIndex);
+
+ /**
+ * Gets the end row for a given stripe.
+ * @param stripeIndex Stripe index.
+ * @return End row. May be an open key.
+ */
+ public byte[] getEndRow(int stripeIndex);
+
+ /**
+ * @return Level 0 files.
+ */
+ public List<StoreFile> getLevel0Files();
+
+ /**
+ * @return All stripe boundaries; including the open ones on both ends.
+ */
+ public List<byte[]> getStripeBoundaries();
+
+ /**
+ * @return The stripes.
+ */
+ public ArrayList<ImmutableList<StoreFile>> getStripes();
+
+ /**
+ * @return Stripe count.
+ */
+ public int getStripeCount();
+ }
+}
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java?rev=1536569&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactor.java Tue Oct 29 00:37:30 2013
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver.compactions;
+
+import java.util.List;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.regionserver.Store;
+
+/**
+ * This is the placeholder for stripe compactor. The implementation,
+ * as well as the proper javadoc, will be added in HBASE-7967.
+ */
+public class StripeCompactor extends Compactor {
+
+ public StripeCompactor(Configuration conf, final Store store) {
+ super(conf, store);
+ }
+
+ public List<Path> compact(CompactionRequest request, List<byte[]> targetBoundaries,
+ byte[] dropDeletesFromRow, byte[] dropDeletesToRow) {
+ throw new NotImplementedException();
+ }
+
+ public List<Path> compact(CompactionRequest request, int targetCount, long targetSize,
+ byte[] left, byte[] right, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) {
+ throw new NotImplementedException();
+ }
+}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Tue Oct 29 00:37:30 2013
@@ -73,6 +73,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.regionserver.compactions.DefaultCompactor;
import org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.util.Bytes;
@@ -644,7 +645,7 @@ public class TestCompaction {
HStore store = (HStore) r.getStore(COLUMN_FAMILY);
Collection<StoreFile> storeFiles = store.getStorefiles();
- Compactor tool = store.storeEngine.getCompactor();
+ DefaultCompactor tool = (DefaultCompactor)store.storeEngine.getCompactor();
List<Path> newFiles = tool.compactForTesting(storeFiles, false);
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java?rev=1536569&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreEngine.java Tue Oct 29 00:37:30 2013
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.*;
+import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionContext;
+import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
+import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
+import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactor;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(SmallTests.class)
+public class TestStripeStoreEngine {
+
+ @Test
+ public void testCreateBasedOnConfig() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, TestStoreEngine.class.getName());
+ StripeStoreEngine se = createEngine(conf);
+ assertTrue(se.getCompactionPolicy() instanceof StripeCompactionPolicy);
+ }
+
+ public static class TestStoreEngine extends StripeStoreEngine {
+ public void setCompactorOverride(StripeCompactor compactorOverride) {
+ this.compactor = compactorOverride;
+ }
+ }
+
+ @Test
+ public void testCompactionContextForceSelect() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ int targetCount = 2;
+ conf.setInt(StripeStoreConfig.INITIAL_STRIPE_COUNT_KEY, targetCount);
+ conf.setInt(StripeStoreConfig.MIN_FILES_L0_KEY, 2);
+ conf.set(StoreEngine.STORE_ENGINE_CLASS_KEY, TestStoreEngine.class.getName());
+ TestStoreEngine se = createEngine(conf);
+ StripeCompactor mockCompactor = mock(StripeCompactor.class);
+ se.setCompactorOverride(mockCompactor);
+ when(mockCompactor.compact(any(CompactionRequest.class), anyInt(), anyLong(),
+ any(byte[].class), any(byte[].class), any(byte[].class), any(byte[].class)))
+ .thenReturn(new ArrayList<Path>());
+
+ // Produce 3 L0 files.
+ StoreFile sf = createFile();
+ ArrayList<StoreFile> compactUs = al(sf, createFile(), createFile());
+ se.getStoreFileManager().loadFiles(compactUs);
+ // Create a compaction that would want to split the stripe.
+ CompactionContext compaction = se.createCompaction();
+ compaction.select(al(), false, false, false);
+ assertEquals(3, compaction.getRequest().getFiles().size());
+ // Override the file list. Granted, overriding this compaction in this manner will
+ // break things in real world, but we only want to verify the override.
+ compactUs.remove(sf);
+ CompactionRequest req = new CompactionRequest(compactUs);
+ compaction.forceSelect(req);
+ assertEquals(2, compaction.getRequest().getFiles().size());
+ assertFalse(compaction.getRequest().getFiles().contains(sf));
+ // Make sure the correct method it called on compactor.
+ compaction.compact();
+ verify(mockCompactor, times(1)).compact(compaction.getRequest(), targetCount, 0L,
+ StripeStoreFileManager.OPEN_KEY, StripeStoreFileManager.OPEN_KEY, null, null);
+ }
+
+ private static StoreFile createFile() throws Exception {
+ StoreFile sf = mock(StoreFile.class);
+ when(sf.getMetadataValue(any(byte[].class)))
+ .thenReturn(StripeStoreFileManager.INVALID_KEY);
+ when(sf.getReader()).thenReturn(mock(StoreFile.Reader.class));
+ when(sf.getPath()).thenReturn(new Path("moo"));
+ return sf;
+ }
+
+ private static TestStoreEngine createEngine(Configuration conf) throws Exception {
+ Store store = mock(Store.class);
+ KVComparator kvComparator = mock(KVComparator.class);
+ return (TestStoreEngine)StoreEngine.create(store, conf, kvComparator);
+ }
+
+ private static ArrayList<StoreFile> al(StoreFile... sfs) {
+ return new ArrayList<StoreFile>(Arrays.asList(sfs));
+ }
+}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java?rev=1536569&r1=1536568&r2=1536569&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java Tue Oct 29 00:37:30 2013
@@ -43,6 +43,7 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
@Category(SmallTests.class)
@@ -237,7 +238,7 @@ public class TestStripeStoreFileManager
Configuration conf = HBaseConfiguration.create();
if (splitRatioToVerify != 0) {
- conf.setFloat(StripeStoreConfig.MAX_SPLIT_IMBALANCE, splitRatioToVerify);
+ conf.setFloat(StripeStoreConfig.MAX_REGION_SPLIT_IMBALANCE_KEY, splitRatioToVerify);
}
StripeStoreFileManager manager = createManager(al(), conf);
manager.addCompactionResults(al(), sfs);
@@ -536,6 +537,7 @@ public class TestStripeStoreFileManager
verifyGetOrScanScenario(manager, false, null, null, results);
}
+ // TODO: replace with Mockito?
private static MockStoreFile createFile(
long size, long seqNum, byte[] startKey, byte[] endKey) throws Exception {
FileSystem fs = TEST_UTIL.getTestFileSystem();
@@ -573,7 +575,9 @@ public class TestStripeStoreFileManager
private static StripeStoreFileManager createManager(
ArrayList<StoreFile> sfs, Configuration conf) throws Exception {
- StripeStoreFileManager result = new StripeStoreFileManager(new KVComparator(), conf);
+ StripeStoreConfig config = new StripeStoreConfig(
+ conf, Mockito.mock(StoreConfigInformation.class));
+ StripeStoreFileManager result = new StripeStoreFileManager(new KVComparator(), conf, config);
result.loadFiles(sfs);
return result;
}