You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/11/06 02:49:36 UTC
svn commit: r1539211 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/
main/java/org/apache/hadoop/hbase/regionserver/compactions/
test/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regio...
Author: sershe
Date: Wed Nov 6 01:49:36 2013
New Revision: 1539211
URL: http://svn.apache.org/r1539211
Log:
HBASE-8541 implement flush-into-stripes in stripe compactions
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFileManager.java Wed Nov 6 01:49:36 2013
@@ -68,9 +68,9 @@ class DefaultStoreFileManager implements
}
@Override
- public void insertNewFile(StoreFile sf) {
+ public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(storefiles);
- newFiles.add(sf);
+ newFiles.addAll(sfs);
sortAndSetStoreFiles(newFiles);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/DefaultStoreFlusher.java Wed Nov 6 01:49:36 2013
@@ -53,13 +53,7 @@ public class DefaultStoreFlusher extends
// Use a store scanner to find which rows to flush.
long smallestReadPoint = store.getSmallestReadPoint();
- KeyValueScanner memstoreScanner =
- new CollectionBackedScanner(snapshot, store.getComparator());
- InternalScanner scanner = preCreateCoprocScanner(memstoreScanner);
- if (scanner == null) {
- scanner = createStoreScanner(smallestReadPoint, memstoreScanner);
- }
- scanner = postCreateCoprocScanner(scanner);
+ InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
if (scanner == null) {
return result; // NULL scanner returned from coprocessor hooks means skip normal processing
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Wed Nov 6 01:49:36 2013
@@ -611,7 +611,7 @@ public class HStore implements Store {
// Append the new storefile into the list
this.lock.writeLock().lock();
try {
- this.storeEngine.getStoreFileManager().insertNewFile(sf);
+ this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
} finally {
// We need the lock, as long as we are updating the storeFiles
// or changing the memstore. Let us release it before calling
@@ -852,9 +852,7 @@ public class HStore implements Store {
final List<StoreFile> sfs, final SortedSet<KeyValue> set) throws IOException {
this.lock.writeLock().lock();
try {
- for (StoreFile sf : sfs) {
- this.storeEngine.getStoreFileManager().insertNewFile(sf);
- }
+ this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
this.memstore.clearSnapshot(set);
} finally {
// We need the lock, as long as we are updating the storeFiles
@@ -1122,15 +1120,15 @@ public class HStore implements Store {
.append(" to execute.");
LOG.info(message.toString());
if (LOG.isTraceEnabled()) {
- int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
- long resultSize = 0;
- for (StoreFile sf : sfs) {
- resultSize += sf.getReader().length();
- }
- String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
- + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
- + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";
- LOG.trace(traceMessage);
+ int fileCount = storeEngine.getStoreFileManager().getStorefileCount();
+ long resultSize = 0;
+ for (StoreFile sf : sfs) {
+ resultSize += sf.getReader().length();
+ }
+ String traceMessage = "COMPACTION start,end,size out,files in,files out,store size,"
+ + "store files [" + compactionStartTime + "," + now + "," + resultSize + ","
+ + cr.getFiles().size() + "," + sfs.size() + "," + storeSize + "," + fileCount + "]";
+ LOG.trace(traceMessage);
}
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileManager.java Wed Nov 6 01:49:36 2013
@@ -47,10 +47,10 @@ public interface StoreFileManager {
void loadFiles(List<StoreFile> storeFiles);
/**
- * Adds new file, either for from MemStore flush or bulk insert, into the structure.
+ * Adds new files, either for from MemStore flush or bulk insert, into the structure.
* @param sf New store file.
*/
- void insertNewFile(StoreFile sf);
+ void insertNewFiles(Collection<StoreFile> sfs) throws IOException;
/**
* Adds compaction results into the structure.
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFlusher.java Wed Nov 6 01:49:36 2013
@@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.KeyValueU
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.compactions.Compactor;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
/**
* Store flusher interface. Turns a snapshot of memstore into a set of store files (usually one).
@@ -77,31 +78,27 @@ abstract class StoreFlusher {
writer.close();
}
- /** Calls coprocessor to create a flush scanner based on memstore scanner */
- protected InternalScanner preCreateCoprocScanner(
- KeyValueScanner memstoreScanner) throws IOException {
- if (store.getCoprocessorHost() != null) {
- return store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner);
- }
- return null;
- }
-
- /** Creates the default flush scanner based on memstore scanner */
- protected InternalScanner createStoreScanner(long smallestReadPoint,
- KeyValueScanner memstoreScanner) throws IOException {
- Scan scan = new Scan();
- scan.setMaxVersions(store.getScanInfo().getMaxVersions());
- return new StoreScanner(store, store.getScanInfo(), scan,
- Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
- smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
- }
/**
- * Calls coprocessor to create a scanner based on default flush scanner
- * @return new or default scanner; if null, flush should not proceed.
+ * Creates the scanner for flushing snapshot. Also calls coprocessors.
+ * @return The scanner; null if coprocessor is canceling the flush.
*/
- protected InternalScanner postCreateCoprocScanner(InternalScanner scanner)
- throws IOException {
+ protected InternalScanner createScanner(SortedSet<KeyValue> snapshot,
+ long smallestReadPoint) throws IOException {
+ KeyValueScanner memstoreScanner =
+ new CollectionBackedScanner(snapshot, store.getComparator());
+ InternalScanner scanner = null;
+ if (store.getCoprocessorHost() != null) {
+ scanner = store.getCoprocessorHost().preFlushScannerOpen(store, memstoreScanner);
+ }
+ if (scanner == null) {
+ Scan scan = new Scan();
+ scan.setMaxVersions(store.getScanInfo().getMaxVersions());
+ scanner = new StoreScanner(store, store.getScanInfo(), scan,
+ Collections.singletonList(memstoreScanner), ScanType.COMPACT_RETAIN_DELETES,
+ smallestReadPoint, HConstants.OLDEST_TIMESTAMP);
+ }
+ assert scanner != null;
if (store.getCoprocessorHost() != null) {
return store.getCoprocessorHost().preFlush(store, scanner);
}
@@ -114,7 +111,7 @@ abstract class StoreFlusher {
* @param sink Sink to write data to. Could be StoreFile.Writer.
* @param smallestReadPoint Smallest read point used for the flush.
* @return Bytes flushed.
-s */
+ */
protected long performFlush(InternalScanner scanner,
Compactor.CellSink sink, long smallestReadPoint) throws IOException {
int compactionKVMax =
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeMultiFileWriter.java Wed Nov 6 01:49:36 2013
@@ -46,6 +46,9 @@ public abstract class StripeMultiFileWri
/** Source scanner that is tracking KV count; may be null if source is not StoreScanner */
protected StoreScanner sourceScanner;
+ /** Whether to write stripe metadata */
+ private boolean doWriteStripeMetadata = true;
+
public interface WriterFactory {
public StoreFile.Writer createWriter() throws IOException;
}
@@ -63,18 +66,24 @@ public abstract class StripeMultiFileWri
this.comparator = comparator;
}
+ public void setNoStripeMetadata() {
+ this.doWriteStripeMetadata = false;
+ }
+
public List<Path> commitWriters(long maxSeqId, boolean isMajor) throws IOException {
assert this.existingWriters != null;
commitWritersInternal();
assert this.boundaries.size() == (this.existingWriters.size() + 1);
- LOG.debug("Writing out metadata for " + this.existingWriters.size() + " writers");
-
+ LOG.debug((this.doWriteStripeMetadata ? "W" : "Not w")
+ + "riting out metadata for " + this.existingWriters.size() + " writers");
List<Path> paths = new ArrayList<Path>();
for (int i = 0; i < this.existingWriters.size(); ++i) {
StoreFile.Writer writer = this.existingWriters.get(i);
if (writer == null) continue; // writer was skipped due to 0 KVs
- writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
- writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
+ if (doWriteStripeMetadata) {
+ writer.appendFileInfo(StripeStoreFileManager.STRIPE_START_KEY, this.boundaries.get(i));
+ writer.appendFileInfo(StripeStoreFileManager.STRIPE_END_KEY, this.boundaries.get(i + 1));
+ }
writer.appendMetadata(maxSeqId, isMajor);
paths.add(writer.getPath());
writer.close();
@@ -109,8 +118,8 @@ public abstract class StripeMultiFileWri
byte[] left, byte[] row, int rowOffset, int rowLength) throws IOException {
if (StripeStoreFileManager.OPEN_KEY != left &&
comparator.compareRows(row, rowOffset, rowLength, left, 0, left.length) < 0) {
- String error = "The first row is lower than the left boundary of ["
- + Bytes.toString(left) + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
+ String error = "The first row is lower than the left boundary of [" + Bytes.toString(left)
+ + "]: [" + Bytes.toString(row, rowOffset, rowLength) + "]";
LOG.error(error);
throw new IOException(error);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreConfig.java Wed Nov 6 01:49:36 2013
@@ -56,6 +56,9 @@ public class StripeStoreConfig {
to get this count from the outset and prevent unnecessary splitting. */
public static final String INITIAL_STRIPE_COUNT_KEY = "hbase.store.stripe.initialStripeCount";
+ /** Whether to flush memstore to L0 files, or directly to stripes. */
+ public static final String FLUSH_TO_L0_KEY = "hbase.store.stripe.compaction.flushToL0";
+
/** When splitting region, the maximum size imbalance to allow in an attempt to split at a
stripe boundary, so that no files go to both regions. Most users won't need to change that. */
public static final String MAX_REGION_SPLIT_IMBALANCE_KEY =
@@ -70,36 +73,46 @@ public class StripeStoreConfig {
private final int initialCount;
private final long sizeToSplitAt;
private final float splitPartCount;
+ private final boolean flushIntoL0;
private final long splitPartSize; // derived from sizeToSplitAt and splitPartCount
- private final double EPSILON = 0.001; // good enough for this, not a real epsilon.
+ private static final double EPSILON = 0.001; // good enough for this, not a real epsilon.
public StripeStoreConfig(Configuration config, StoreConfigInformation sci) {
this.level0CompactMinFiles = config.getInt(MIN_FILES_L0_KEY, 4);
this.stripeCompactMinFiles = config.getInt(MIN_FILES_KEY, 3);
this.stripeCompactMaxFiles = config.getInt(MAX_FILES_KEY, 10);
this.maxRegionSplitImbalance = getFloat(config, MAX_REGION_SPLIT_IMBALANCE_KEY, 1.5f, true);
+ this.flushIntoL0 = config.getBoolean(FLUSH_TO_L0_KEY, false);
- this.splitPartCount = getFloat(config, SPLIT_PARTS_KEY, 2.0f, true);
+ float splitPartCount = getFloat(config, SPLIT_PARTS_KEY, 2f, true);
if (Math.abs(splitPartCount - 1.0) < EPSILON) {
- throw new RuntimeException("Split part count cannot be 1: " + this.splitPartCount);
+ LOG.error("Split part count cannot be 1 (" + this.splitPartCount + "), using the default");
+ splitPartCount = 2f;
}
- // TODO: change when no L0.
+ this.splitPartCount = splitPartCount;
// Arbitrary default split size - 4 times the size of one L0 compaction.
+ // If we flush into L0 there's no split compaction, but for default value it is ok.
double flushSize = sci.getMemstoreFlushSize();
if (flushSize == 0) {
flushSize = 128 * 1024 * 1024;
}
long defaultSplitSize = (long)(flushSize * getLevel0MinFiles() * 4 * splitPartCount);
this.sizeToSplitAt = config.getLong(SIZE_TO_SPLIT_KEY, defaultSplitSize);
- this.initialCount = config.getInt(INITIAL_STRIPE_COUNT_KEY, 1);
+ int initialCount = config.getInt(INITIAL_STRIPE_COUNT_KEY, 1);
+ if (initialCount == 0) {
+ LOG.error("Initial stripe count is 0, using the default");
+ initialCount = 1;
+ }
+ this.initialCount = initialCount;
this.splitPartSize = (long)(this.sizeToSplitAt / this.splitPartCount);
}
private static float getFloat(
Configuration config, String key, float defaultValue, boolean moreThanOne) {
float value = config.getFloat(key, defaultValue);
- if (value == 0) {
- LOG.warn(String.format("%s is set to 0; using default value of %f", key, defaultValue));
+ if (value < EPSILON) {
+ LOG.warn(String.format(
+ "%s is set to 0 or negative; using default value of %f", key, defaultValue));
value = defaultValue;
} else if ((value > 1f) != moreThanOne) {
value = 1f / value;
@@ -123,6 +136,10 @@ public class StripeStoreConfig {
return stripeCompactMaxFiles;
}
+ public boolean isUsingL0Flush() {
+ return flushIntoL0;
+ }
+
public long getSplitSize() {
return sizeToSplitAt;
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreEngine.java Wed Nov 6 01:49:36 2013
@@ -38,7 +38,7 @@ import com.google.common.base.Preconditi
* The storage engine that implements the stripe-based store/compaction scheme.
*/
@InterfaceAudience.Private
-public class StripeStoreEngine extends StoreEngine<DefaultStoreFlusher,
+public class StripeStoreEngine extends StoreEngine<StripeStoreFlusher,
StripeCompactionPolicy, StripeCompactor, StripeStoreFileManager> {
static final Log LOG = LogFactory.getLog(StripeStoreEngine.class);
private StripeStoreConfig config;
@@ -58,8 +58,9 @@ public class StripeStoreEngine extends S
Configuration conf, Store store, KVComparator comparator) throws IOException {
this.config = new StripeStoreConfig(conf, store);
this.compactionPolicy = new StripeCompactionPolicy(conf, store, config);
- this.storeFlusher = new DefaultStoreFlusher(conf, store);
this.storeFileManager = new StripeStoreFileManager(comparator, conf, this.config);
+ this.storeFlusher = new StripeStoreFlusher(
+ conf, store, this.compactionPolicy, this.storeFileManager);
this.compactor = new StripeCompactor(conf, store);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFileManager.java Wed Nov 6 01:49:36 2013
@@ -24,8 +24,10 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
@@ -41,6 +43,8 @@ import org.apache.hadoop.hbase.util.Conc
import com.google.common.collect.ImmutableCollection;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+
/**
* Stripe implementation of StoreFileManager.
@@ -136,15 +140,10 @@ public class StripeStoreFileManager
}
@Override
- public void insertNewFile(StoreFile sf) {
- LOG.debug("New level 0 file: " + sf);
- ArrayList<StoreFile> newFiles = new ArrayList<StoreFile>(state.level0Files);
- insertFileIntoStripe(newFiles, sf);
- ensureLevel0Metadata(sf);
- this.state.level0Files = ImmutableList.copyOf(newFiles);
- ArrayList<StoreFile> newAllFiles = new ArrayList<StoreFile>(state.allFilesCached);
- newAllFiles.add(sf);
- this.state.allFilesCached = ImmutableList.copyOf(newAllFiles);
+ public void insertNewFiles(Collection<StoreFile> sfs) throws IOException {
+ CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(true);
+ cmc.mergeResults(null, sfs);
+ debugDumpState("Added new files");
}
@Override
@@ -304,7 +303,7 @@ public class StripeStoreFileManager
+ " files replaced by " + results.size());
// In order to be able to fail in the middle of the operation, we'll operate on lazy
// copies and apply the result at the end.
- CompactionResultsMergeCopy cmc = new CompactionResultsMergeCopy();
+ CompactionOrFlushMergeCopy cmc = new CompactionOrFlushMergeCopy(false);
cmc.mergeResults(compactedFiles, results);
debugDumpState("Merged compaction results");
}
@@ -628,11 +627,11 @@ public class StripeStoreFileManager
}
/**
- * Non-static helper class for merging compaction results.
- * Since we want to merge them atomically (more or less), it operates on lazy copies, and
- * then applies copies to real lists as necessary.
+ * Non-static helper class for merging compaction or flush results.
+ * Since we want to merge them atomically (more or less), it operates on lazy copies,
+ * then creates a new state object and puts it in place.
*/
- private class CompactionResultsMergeCopy {
+ private class CompactionOrFlushMergeCopy {
private ArrayList<List<StoreFile>> stripeFiles = null;
private ArrayList<StoreFile> level0Files = null;
private ArrayList<byte[]> stripeEndRows = null;
@@ -641,11 +640,13 @@ public class StripeStoreFileManager
private Collection<StoreFile> results = null;
private List<StoreFile> l0Results = new ArrayList<StoreFile>();
+ private final boolean isFlush;
- public CompactionResultsMergeCopy() {
+ public CompactionOrFlushMergeCopy(boolean isFlush) {
// Create a lazy mutable copy (other fields are so lazy they start out as nulls).
this.stripeFiles = new ArrayList<List<StoreFile>>(
StripeStoreFileManager.this.state.stripeFiles);
+ this.isFlush = isFlush;
}
public void mergeResults(Collection<StoreFile> compactedFiles, Collection<StoreFile> results)
@@ -654,8 +655,8 @@ public class StripeStoreFileManager
this.compactedFiles = compactedFiles;
this.results = results;
// Do logical processing.
- removeCompactedFiles();
- TreeMap<byte[], StoreFile> newStripes = processCompactionResults();
+ if (!isFlush) removeCompactedFiles();
+ TreeMap<byte[], StoreFile> newStripes = processResults();
if (newStripes != null) {
processNewCandidateStripes(newStripes);
}
@@ -681,7 +682,7 @@ public class StripeStoreFileManager
}
List<StoreFile> newAllFiles = new ArrayList<StoreFile>(oldState.allFilesCached);
- newAllFiles.removeAll(compactedFiles);
+ if (!isFlush) newAllFiles.removeAll(compactedFiles);
newAllFiles.addAll(results);
newState.allFilesCached = ImmutableList.copyOf(newAllFiles);
return newState;
@@ -689,12 +690,16 @@ public class StripeStoreFileManager
private void updateMetadataMaps() {
StripeStoreFileManager parent = StripeStoreFileManager.this;
- for (StoreFile sf : this.compactedFiles) {
- parent.fileStarts.remove(sf);
- parent.fileEnds.remove(sf);
+ if (!isFlush) {
+ for (StoreFile sf : this.compactedFiles) {
+ parent.fileStarts.remove(sf);
+ parent.fileEnds.remove(sf);
+ }
}
- for (StoreFile sf : this.l0Results) {
- parent.ensureLevel0Metadata(sf);
+ if (this.l0Results != null) {
+ for (StoreFile sf : this.l0Results) {
+ parent.ensureLevel0Metadata(sf);
+ }
}
}
@@ -729,12 +734,14 @@ public class StripeStoreFileManager
* or to the list of new candidate stripes.
* @return New candidate stripes.
*/
- private TreeMap<byte[], StoreFile> processCompactionResults() throws IOException {
+ private TreeMap<byte[], StoreFile> processResults() throws IOException {
TreeMap<byte[], StoreFile> newStripes = null;
for (StoreFile sf : this.results) {
byte[] startRow = startOf(sf), endRow = endOf(sf);
if (isInvalid(endRow) || isInvalid(startRow)) {
- LOG.warn("The newly compacted files doesn't have stripe rows set: " + sf.getPath());
+ if (!isFlush) {
+ LOG.warn("The newly compacted file doesn't have stripes set: " + sf.getPath());
+ }
insertFileIntoStripe(getLevel0Copy(), sf);
this.l0Results.add(sf);
continue;
@@ -775,7 +782,7 @@ public class StripeStoreFileManager
int stripeIndex = findStripeIndexByEndRow(oldEndRow);
if (stripeIndex < 0) {
throw new IOException("An allegedly compacted file [" + oldFile + "] does not belong"
- + " to a known stripe (end row + [" + Bytes.toString(oldEndRow) + "])");
+ + " to a known stripe (end row - [" + Bytes.toString(oldEndRow) + "])");
}
source = getStripeCopy(stripeIndex);
}
@@ -803,6 +810,8 @@ public class StripeStoreFileManager
throw new IOException("Newly created stripes do not cover the entire key space.");
}
+ boolean canAddNewStripes = true;
+ Collection<StoreFile> filesForL0 = null;
if (hasStripes) {
// Determine which stripes will need to be removed because they conflict with new stripes.
// The new boundaries should match old stripe boundaries, so we should get exact matches.
@@ -815,20 +824,49 @@ public class StripeStoreFileManager
}
int removeTo = findStripeIndexByEndRow(lastEndRow);
if (removeTo < 0) throw new IOException("Compaction is trying to add a bad range.");
- // Remove old empty stripes.
- int originalCount = this.stripeFiles.size();
+ // See if there are files in the stripes we are trying to replace.
+ ArrayList<StoreFile> conflictingFiles = new ArrayList<StoreFile>();
for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
- if (!this.stripeFiles.get(removeIndex).isEmpty()) {
- throw new IOException("Compaction intends to create a new stripe that replaces an"
- + " existing one, but the latter contains some files.");
+ conflictingFiles.addAll(this.stripeFiles.get(removeIndex));
+ }
+ if (!conflictingFiles.isEmpty()) {
+ // This can be caused by two things - concurrent flush into stripes, or a bug.
+ // Unfortunately, we cannot tell them apart without looking at timing or something
+ // like that. We will assume we are dealing with a flush and dump it into L0.
+ if (isFlush) {
+ long newSize = StripeCompactionPolicy.getTotalFileSize(newStripes.values());
+ LOG.warn("Stripes were created by a flush, but results of size " + newSize
+ + " cannot be added because the stripes have changed");
+ canAddNewStripes = false;
+ filesForL0 = newStripes.values();
+ } else {
+ long oldSize = StripeCompactionPolicy.getTotalFileSize(conflictingFiles);
+ LOG.info(conflictingFiles.size() + " conflicting files (likely created by a flush) "
+ + " of size " + oldSize + " are moved to L0 due to concurrent stripe change");
+ filesForL0 = conflictingFiles;
}
- if (removeIndex != originalCount - 1) {
- this.stripeEndRows.remove(removeIndex);
+ if (filesForL0 != null) {
+ for (StoreFile sf : filesForL0) {
+ insertFileIntoStripe(getLevel0Copy(), sf);
+ }
+ l0Results.addAll(filesForL0);
+ }
+ }
+
+ if (canAddNewStripes) {
+ // Remove old empty stripes.
+ int originalCount = this.stripeFiles.size();
+ for (int removeIndex = removeTo; removeIndex >= removeFrom; --removeIndex) {
+ if (removeIndex != originalCount - 1) {
+ this.stripeEndRows.remove(removeIndex);
+ }
+ this.stripeFiles.remove(removeIndex);
}
- this.stripeFiles.remove(removeIndex);
}
}
+ if (!canAddNewStripes) return; // Files were already put into L0.
+
// Now, insert new stripes. The total ranges match, so we can insert where we removed.
byte[] previousEndRow = null;
int insertAt = removeFrom;
@@ -838,7 +876,8 @@ public class StripeStoreFileManager
assert !isOpen(previousEndRow);
byte[] startRow = startOf(newStripe.getValue());
if (!rowEquals(previousEndRow, startRow)) {
- throw new IOException("The new stripes produced by compaction are not contiguous");
+ throw new IOException("The new stripes produced by "
+ + (isFlush ? "flush" : "compaction") + " are not contiguous");
}
}
// Add the new stripe.
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java?rev=1539211&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StripeStoreFlusher.java Wed Nov 6 01:49:36 2013
@@ -0,0 +1,171 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.regionserver.StripeStoreFileManager.OPEN_KEY;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.SortedSet;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Writer;
+import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
+import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy;
+import org.apache.hadoop.hbase.util.CollectionBackedScanner;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Stripe implementation of StoreFlusher. Flushes files either into L0 file w/o metadata, or
+ * into separate striped files, avoiding L0.
+ */
+public class StripeStoreFlusher extends StoreFlusher {
+ private static final Log LOG = LogFactory.getLog(StripeStoreFlusher.class);
+ private final Object flushLock = new Object();
+ private final StripeCompactionPolicy policy;
+ private final StripeCompactionPolicy.StripeInformationProvider stripes;
+
+ public StripeStoreFlusher(Configuration conf, Store store,
+ StripeCompactionPolicy policy, StripeStoreFileManager stripes) {
+ super(conf, store);
+ this.policy = policy;
+ this.stripes = stripes;
+ }
+
+ @Override
+ public List<Path> flushSnapshot(SortedSet<KeyValue> snapshot, long cacheFlushSeqNum,
+ final TimeRangeTracker tracker, AtomicLong flushedSize, MonitoredTask status)
+ throws IOException {
+ List<Path> result = null;
+ int kvCount = snapshot.size();
+ if (kvCount == 0) return result; // don't flush if there are no entries
+
+ long smallestReadPoint = store.getSmallestReadPoint();
+ InternalScanner scanner = createScanner(snapshot, smallestReadPoint);
+ if (scanner == null) {
+ return result; // NULL scanner returned from coprocessor hooks means skip normal processing
+ }
+
+ // Let policy select flush method.
+ StripeFlushRequest req = this.policy.selectFlush(this.stripes, kvCount);
+
+ long flushedBytes = 0;
+ boolean success = false;
+ StripeMultiFileWriter mw = null;
+ try {
+ mw = req.createWriter(); // Writer according to the policy.
+ StripeMultiFileWriter.WriterFactory factory = createWriterFactory(tracker, kvCount);
+ StoreScanner storeScanner = (scanner instanceof StoreScanner) ? (StoreScanner)scanner : null;
+ mw.init(storeScanner, factory, store.getComparator());
+
+ synchronized (flushLock) {
+ flushedBytes = performFlush(scanner, mw, smallestReadPoint);
+ result = mw.commitWriters(cacheFlushSeqNum, false);
+ success = true;
+ }
+ } finally {
+ if (!success && (mw != null)) {
+ result.clear();
+ for (Path leftoverFile : mw.abortWriters()) {
+ try {
+ store.getFileSystem().delete(leftoverFile, false);
+ } catch (Exception e) {
+ LOG.error("Failed to delete a file after failed flush: " + e);
+ }
+ }
+ }
+ flushedSize.set(flushedBytes);
+ try {
+ scanner.close();
+ } catch (IOException ex) {
+ LOG.warn("Failed to close flush scanner, ignoring", ex);
+ }
+ }
+ return result;
+ }
+
+ private StripeMultiFileWriter.WriterFactory createWriterFactory(
+ final TimeRangeTracker tracker, final long kvCount) {
+ return new StripeMultiFileWriter.WriterFactory() {
+ @Override
+ public Writer createWriter() throws IOException {
+ StoreFile.Writer writer = store.createWriterInTmp(
+ kvCount, store.getFamily().getCompression(), false, true, true);
+ writer.setTimeRangeTracker(tracker);
+ return writer;
+ }
+ };
+ }
+
+ /** Stripe flush request wrapper that writes a non-striped file. */
+ public static class StripeFlushRequest {
+ @VisibleForTesting
+ public StripeMultiFileWriter createWriter() throws IOException {
+ StripeMultiFileWriter writer =
+ new StripeMultiFileWriter.SizeMultiWriter(1, Long.MAX_VALUE, OPEN_KEY, OPEN_KEY);
+ writer.setNoStripeMetadata();
+ return writer;
+ }
+ }
+
+ /** Stripe flush request wrapper based on boundaries. */
+ public static class BoundaryStripeFlushRequest extends StripeFlushRequest {
+ private final List<byte[]> targetBoundaries;
+
+ /** @param targetBoundaries New files should be written with these boundaries. */
+ public BoundaryStripeFlushRequest(List<byte[]> targetBoundaries) {
+ this.targetBoundaries = targetBoundaries;
+ }
+
+ @Override
+ public StripeMultiFileWriter createWriter() throws IOException {
+ return new StripeMultiFileWriter.BoundaryMultiWriter(targetBoundaries, null, null);
+ }
+ }
+
+ /** Stripe flush request wrapper based on size. */
+ public static class SizeStripeFlushRequest extends StripeFlushRequest {
+ private final int targetCount;
+ private final long targetKvs;
+
+ /**
+ * @param targetCount The maximum number of stripes to flush into.
+ * @param targetKvs The KV count of each segment. If targetKvs*targetCount is less than
+ * total number of kvs, all the overflow data goes into the last stripe.
+ */
+ public SizeStripeFlushRequest(int targetCount, long targetKvs) {
+ this.targetCount = targetCount;
+ this.targetKvs = targetKvs;
+ }
+
+ @Override
+ public StripeMultiFileWriter createWriter() throws IOException {
+ return new StripeMultiFileWriter.SizeMultiWriter(
+ this.targetCount, this.targetKvs, OPEN_KEY, OPEN_KEY);
+ }
+ }
+}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/compactions/StripeCompactionPolicy.java Wed Nov 6 01:49:36 2013
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.StoreUtils;
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
+import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConcatenatedLists;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -75,9 +76,24 @@ public class StripeCompactionPolicy exte
if (si.getStripeCount() > 0) {
return new BoundaryStripeCompactionRequest(request, si.getStripeBoundaries());
}
- int initialCount = this.config.getInitialCount();
- long targetKvs = estimateTargetKvs(request.getFiles(), initialCount).getFirst();
- return new SplitStripeCompactionRequest(request, OPEN_KEY, OPEN_KEY, targetKvs);
+ Pair<Long, Integer> targetKvsAndCount = estimateTargetKvs(
+ request.getFiles(), this.config.getInitialCount());
+ return new SplitStripeCompactionRequest(
+ request, OPEN_KEY, OPEN_KEY, targetKvsAndCount.getSecond(), targetKvsAndCount.getFirst());
+ }
+
+ public StripeStoreFlusher.StripeFlushRequest selectFlush(
+ StripeInformationProvider si, int kvCount) {
+ if (this.config.isUsingL0Flush()) {
+ return new StripeStoreFlusher.StripeFlushRequest(); // L0 is used, return dumb request.
+ }
+ if (si.getStripeCount() == 0) {
+ // No stripes - start with the requisite count, derive KVs per stripe.
+ int initialCount = this.config.getInitialCount();
+ return new StripeStoreFlusher.SizeStripeFlushRequest(initialCount, kvCount / initialCount);
+ }
+ // There are stripes - do according to the boundaries.
+ return new StripeStoreFlusher.BoundaryStripeFlushRequest(si.getStripeBoundaries());
}
public StripeCompactionRequest selectCompaction(StripeInformationProvider si,
@@ -341,7 +357,7 @@ public class StripeCompactionPolicy exte
return totalSize;
}
- private static long getTotalFileSize(final Collection<StoreFile> candidates) {
+ public static long getTotalFileSize(final Collection<StoreFile> candidates) {
long totalSize = 0;
for (StoreFile storeFile : candidates) {
totalSize += storeFile.getReader().length();
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeCompactor.java Wed Nov 6 01:49:36 2013
@@ -125,7 +125,7 @@ public class TestStripeCompactor {
StripeCompactor sc = createCompactor(writers, input);
List<Path> paths =
sc.compact(createDummyRequest(), Arrays.asList(boundaries), majorFrom, majorTo);
- writers.verifyKvs(output, allFiles);
+ writers.verifyKvs(output, allFiles, true);
if (allFiles) {
assertEquals(output.length, paths.size());
writers.verifyBoundaries(boundaries);
@@ -162,7 +162,7 @@ public class TestStripeCompactor {
List<Path> paths = sc.compact(
createDummyRequest(), targetCount, targetSize, left, right, null, null);
assertEquals(output.length, paths.size());
- writers.verifyKvs(output, true);
+ writers.verifyKvs(output, true, true);
List<byte[]> boundaries = new ArrayList<byte[]>();
boundaries.add(left);
for (int i = 1; i < output.length; ++i) {
@@ -242,7 +242,8 @@ public class TestStripeCompactor {
}
// StoreFile.Writer has private ctor and is unwieldy, so this has to be convoluted.
- private static class StoreFileWritersCapture implements Answer<StoreFile.Writer> {
+ public static class StoreFileWritersCapture implements
+ Answer<StoreFile.Writer>, StripeMultiFileWriter.WriterFactory {
public static class Writer {
public ArrayList<KeyValue> kvs = new ArrayList<KeyValue>();
public TreeMap<byte[], byte[]> data = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
@@ -251,7 +252,7 @@ public class TestStripeCompactor {
private List<Writer> writers = new ArrayList<Writer>();
@Override
- public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
+ public StoreFile.Writer createWriter() throws IOException {
final Writer realWriter = new Writer();
writers.add(realWriter);
StoreFile.Writer writer = mock(StoreFile.Writer.class);
@@ -267,7 +268,12 @@ public class TestStripeCompactor {
return writer;
}
- public void verifyKvs(KeyValue[][] kvss, boolean allFiles) {
+ @Override
+ public StoreFile.Writer answer(InvocationOnMock invocation) throws Throwable {
+ return createWriter();
+ }
+
+ public void verifyKvs(KeyValue[][] kvss, boolean allFiles, boolean requireMetadata) {
if (allFiles) {
assertEquals(kvss.length, writers.size());
}
@@ -276,8 +282,13 @@ public class TestStripeCompactor {
KeyValue[] kvs = kvss[i];
if (kvs != null) {
Writer w = writers.get(i - skippedWriters);
- assertNotNull(w.data.get(STRIPE_START_KEY));
- assertNotNull(w.data.get(STRIPE_END_KEY));
+ if (requireMetadata) {
+ assertNotNull(w.data.get(STRIPE_START_KEY));
+ assertNotNull(w.data.get(STRIPE_END_KEY));
+ } else {
+ assertNull(w.data.get(STRIPE_START_KEY));
+ assertNull(w.data.get(STRIPE_END_KEY));
+ }
assertEquals(kvs.length, w.kvs.size());
for (int j = 0; j < kvs.length; ++j) {
assertEquals(kvs[j], w.kvs.get(j));
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStripeStoreFileManager.java Wed Nov 6 01:49:36 2013
@@ -83,7 +83,7 @@ public class TestStripeStoreFileManager
public void testInsertFilesIntoL0() throws Exception {
StripeStoreFileManager manager = createManager();
MockStoreFile sf = createFile();
- manager.insertNewFile(sf);
+ manager.insertNewFiles(al(sf));
assertEquals(1, manager.getStorefileCount());
Collection<StoreFile> filesForGet = manager.getFilesForScanOrGet(true, KEY_A, KEY_A);
assertEquals(1, filesForGet.size());
@@ -99,8 +99,8 @@ public class TestStripeStoreFileManager
@Test
public void testClearFiles() throws Exception {
StripeStoreFileManager manager = createManager();
- manager.insertNewFile(createFile());
- manager.insertNewFile(createFile());
+ manager.insertNewFiles(al(createFile()));
+ manager.insertNewFiles(al(createFile()));
manager.addCompactionResults(al(), al(createFile(OPEN_KEY, KEY_B),
createFile(KEY_B, OPEN_KEY)));
assertEquals(4, manager.getStorefileCount());
@@ -120,8 +120,8 @@ public class TestStripeStoreFileManager
public void testRowKeyBefore() throws Exception {
StripeStoreFileManager manager = createManager();
StoreFile l0File = createFile(), l0File2 = createFile();
- manager.insertNewFile(l0File);
- manager.insertNewFile(l0File2);
+ manager.insertNewFiles(al(l0File));
+ manager.insertNewFiles(al(l0File2));
// Get candidate files.
Iterator<StoreFile> sfs = manager.getCandidateFilesForRowKeyBefore(KV_B);
sfs.next();
@@ -174,8 +174,8 @@ public class TestStripeStoreFileManager
// If there are no stripes, should pick midpoint from the biggest file in L0.
MockStoreFile sf5 = createFile(5, 0);
sf5.splitPoint = new byte[1];
- manager.insertNewFile(sf5);
- manager.insertNewFile(createFile(1, 0));
+ manager.insertNewFiles(al(sf5));
+ manager.insertNewFiles(al(createFile(1, 0)));
assertEquals(sf5.splitPoint, manager.getSplitPoint());
// Same if there's one stripe but the biggest file is still in L0.
@@ -259,7 +259,7 @@ public class TestStripeStoreFileManager
// Populate one L0 file.
MockStoreFile sf0 = createFile();
- manager.insertNewFile(sf0);
+ manager.insertNewFiles(al(sf0));
verifyGetAndScanScenario(manager, null, null, sf0);
verifyGetAndScanScenario(manager, null, KEY_C, sf0);
verifyGetAndScanScenario(manager, KEY_B, null, sf0);
@@ -356,14 +356,11 @@ public class TestStripeStoreFileManager
}
@Test
- @SuppressWarnings("unchecked")
public void testAddingCompactionResults() throws Exception {
StripeStoreFileManager manager = createManager();
// First, add some L0 files and "compact" one with new stripe creation.
- StoreFile sf_L0_0a = createFile();
- StoreFile sf_L0_0b = createFile();
- manager.insertNewFile(sf_L0_0a);
- manager.insertNewFile(sf_L0_0b);
+ StoreFile sf_L0_0a = createFile(), sf_L0_0b = createFile();
+ manager.insertNewFiles(al(sf_L0_0a, sf_L0_0b));
// Try compacting with invalid new branches (gaps, overlaps) - no effect.
verifyInvalidCompactionScenario(manager, al(sf_L0_0a), al(createFile(OPEN_KEY, KEY_B)));
@@ -384,7 +381,7 @@ public class TestStripeStoreFileManager
StoreFile sf_L0_1 = createFile();
StoreFile sf_i2B_1 = createFile(OPEN_KEY, KEY_B);
StoreFile sf_B2C_1 = createFile(KEY_B, KEY_C);
- manager.insertNewFile(sf_L0_1);
+ manager.insertNewFiles(al(sf_L0_1));
manager.addCompactionResults(al(sf_L0_0b, sf_L0_1), al(sf_i2B_1, sf_B2C_1));
verifyAllFiles(manager, al(sf_i2B_0, sf_B2C_0, sf_C2i_0, sf_i2B_1, sf_B2C_1));
@@ -400,27 +397,21 @@ public class TestStripeStoreFileManager
manager.addCompactionResults(al(sf_i2B_0, sf_i2B_1), al(sf_i2B_3));
verifyAllFiles(manager, al(sf_B2C_0, sf_C2i_0, sf_B2C_1, sf_i2B_3));
- // Try to rebalance two stripes, but don't take all files from them - no effect.
+ // Rebalance two stripes.
StoreFile sf_B2D_4 = createFile(KEY_B, KEY_D);
StoreFile sf_D2i_4 = createFile(KEY_D, OPEN_KEY);
- ArrayList<StoreFile> compacted3 = al();
- verifyInvalidCompactionScenario(manager, al(sf_B2C_0, sf_C2i_0), al(sf_B2D_4, sf_D2i_4));
-
- // Rebalance two stripes correctly.
manager.addCompactionResults(al(sf_B2C_0, sf_C2i_0, sf_B2C_1), al(sf_B2D_4, sf_D2i_4));
verifyAllFiles(manager, al(sf_i2B_3, sf_B2D_4, sf_D2i_4));
// Split the first stripe.
StoreFile sf_i2A_5 = createFile(OPEN_KEY, KEY_A);
StoreFile sf_A2B_5 = createFile(KEY_A, KEY_B);
- ArrayList<StoreFile> compacted4 = al(createFile(OPEN_KEY, KEY_A), createFile(KEY_A, KEY_B));
manager.addCompactionResults(al(sf_i2B_3), al(sf_i2A_5, sf_A2B_5));
verifyAllFiles(manager, al(sf_B2D_4, sf_D2i_4, sf_i2A_5, sf_A2B_5));
// Split the middle stripe.
StoreFile sf_B2C_6 = createFile(KEY_B, KEY_C);
StoreFile sf_C2D_6 = createFile(KEY_C, KEY_D);
- ArrayList<StoreFile> compacted5 = al(createFile(KEY_B, KEY_C), createFile(KEY_C, KEY_D));
manager.addCompactionResults(al(sf_B2D_4), al(sf_B2C_6, sf_C2D_6));
verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_A2B_5, sf_B2C_6, sf_C2D_6));
@@ -429,14 +420,6 @@ public class TestStripeStoreFileManager
manager.addCompactionResults(al(sf_A2B_5, sf_B2C_6), al(sf_A2C_7));
verifyAllFiles(manager, al(sf_D2i_4, sf_i2A_5, sf_C2D_6, sf_A2C_7));
- // Try various range mismatch cases in replaced and new data - no effect.
- ArrayList<StoreFile> tmp = al(sf_A2C_7, sf_C2D_6); // [A, C)
- verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_B, KEY_C)));
- verifyInvalidCompactionScenario(manager, tmp, al(createFile(OPEN_KEY, KEY_D)));
- verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_A, OPEN_KEY)));
- verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_A, KEY_B)));
- verifyInvalidCompactionScenario(manager, tmp, al(createFile(KEY_B, keyAfter(KEY_B))));
-
// Merge lower half.
StoreFile sf_i2C_8 = createFile(OPEN_KEY, KEY_C);
manager.addCompactionResults(al(sf_i2A_5, sf_A2C_7), al(sf_i2C_8));
@@ -449,13 +432,39 @@ public class TestStripeStoreFileManager
}
@Test
+ public void testCompactionAndFlushConflict() throws Exception {
+ // Add file flush into stripes
+ StripeStoreFileManager sfm = createManager();
+ assertEquals(0, sfm.getStripeCount());
+ StoreFile sf_i2c = createFile(OPEN_KEY, KEY_C), sf_c2i = createFile(KEY_C, OPEN_KEY);
+ sfm.insertNewFiles(al(sf_i2c, sf_c2i));
+ assertEquals(2, sfm.getStripeCount());
+ // Now try to add conflicting flush - should throw.
+ StoreFile sf_i2d = createFile(OPEN_KEY, KEY_D), sf_d2i = createFile(KEY_D, OPEN_KEY);
+ sfm.insertNewFiles(al(sf_i2d, sf_d2i));
+ assertEquals(2, sfm.getStripeCount());
+ assertEquals(2, sfm.getLevel0Files().size());
+ verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_d2i, sf_c2i);
+ // Remove these files.
+ sfm.addCompactionResults(al(sf_i2d, sf_d2i), al());
+ assertEquals(0, sfm.getLevel0Files().size());
+ // Add another file to stripe; then "rebalance" stripes w/o it - the file, which was
+ // presumably flushed during compaction, should go to L0.
+ StoreFile sf_i2c_2 = createFile(OPEN_KEY, KEY_C);
+ sfm.insertNewFiles(al(sf_i2c_2));
+ sfm.addCompactionResults(al(sf_i2c, sf_c2i), al(sf_i2d, sf_d2i));
+ assertEquals(1, sfm.getLevel0Files().size());
+ verifyGetAndScanScenario(sfm, KEY_C, KEY_C, sf_i2d, sf_i2c_2);
+ }
+
+ @Test
public void testEmptyResultsForStripes() throws Exception {
// Test that we can compact L0 into a subset of stripes.
StripeStoreFileManager manager = createManager();
StoreFile sf0a = createFile();
StoreFile sf0b = createFile();
- manager.insertNewFile(sf0a);
- manager.insertNewFile(sf0b);
+ manager.insertNewFiles(al(sf0a));
+ manager.insertNewFiles(al(sf0b));
ArrayList<StoreFile> compacted = al(createFile(OPEN_KEY, KEY_B),
createFile(KEY_B, KEY_C), createFile(KEY_C, OPEN_KEY));
manager.addCompactionResults(al(sf0a), compacted);
@@ -491,7 +500,7 @@ public class TestStripeStoreFileManager
conf.setInt("hbase.hstore.blockingStoreFiles", limit);
StripeStoreFileManager sfm = createManager(al(), conf);
for (int i = 0; i < l0Files; ++i) {
- sfm.insertNewFile(createFile());
+ sfm.insertNewFiles(al(createFile()));
}
for (int i = 0; i < filesInStripe; ++i) {
ArrayList<StoreFile> stripe = new ArrayList<StoreFile>();
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java?rev=1539211&r1=1539210&r2=1539211&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/compactions/TestStripeCompactionPolicy.java Wed Nov 6 01:49:36 2013
@@ -38,16 +38,20 @@ import org.apache.commons.lang.NotImplem
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.regionserver.StoreConfigInformation;
import org.apache.hadoop.hbase.regionserver.StoreFile;
+import org.apache.hadoop.hbase.regionserver.StripeMultiFileWriter;
import org.apache.hadoop.hbase.regionserver.StripeStoreConfig;
import org.apache.hadoop.hbase.regionserver.StripeStoreFileManager;
+import org.apache.hadoop.hbase.regionserver.StripeStoreFlusher;
import org.apache.hadoop.hbase.regionserver.compactions.StripeCompactionPolicy.StripeInformationProvider;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ConcatenatedLists;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+import org.apache.hadoop.hbase.regionserver.TestStripeCompactor.StoreFileWritersCapture;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.mockito.ArgumentMatcher;
@@ -62,6 +66,12 @@ public class TestStripeCompactionPolicy
private static final byte[] KEY_C = Bytes.toBytes("ccc");
private static final byte[] KEY_D = Bytes.toBytes("ddd");
private static final byte[] KEY_E = Bytes.toBytes("eee");
+ private static final KeyValue KV_A = new KeyValue(KEY_A, 0L);
+ private static final KeyValue KV_B = new KeyValue(KEY_B, 0L);
+ private static final KeyValue KV_C = new KeyValue(KEY_C, 0L);
+ private static final KeyValue KV_D = new KeyValue(KEY_D, 0L);
+ private static final KeyValue KV_E = new KeyValue(KEY_E, 0L);
+
private static long defaultSplitSize = 18;
private static float defaultSplitCount = 1.8F;
@@ -69,6 +79,39 @@ public class TestStripeCompactionPolicy
private static long defaultTtl = 1000 * 1000;
@Test
+ public void testNoStripesFromFlush() throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ conf.setBoolean(StripeStoreConfig.FLUSH_TO_L0_KEY, true);
+ StripeCompactionPolicy policy = createPolicy(conf);
+ StripeInformationProvider si = createStripesL0Only(0, 0);
+
+ KeyValue[] input = new KeyValue[] { KV_A, KV_B, KV_C, KV_D, KV_E };
+ KeyValue[][] expected = new KeyValue[][] { input };
+ verifyFlush(policy, si, input, expected, null);
+ }
+
+ @Test
+ public void testOldStripesFromFlush() throws Exception {
+ StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
+ StripeInformationProvider si = createStripes(0, KEY_C, KEY_D);
+
+ KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
+ KeyValue[][] expected = new KeyValue[][] { new KeyValue[] { KV_B },
+ new KeyValue[] { KV_C, KV_C }, new KeyValue[] { KV_D, KV_E } };
+ verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, KEY_C, KEY_D, OPEN_KEY });
+ }
+
+ @Test
+ public void testNewStripesFromFlush() throws Exception {
+ StripeCompactionPolicy policy = createPolicy(HBaseConfiguration.create());
+ StripeInformationProvider si = createStripesL0Only(0, 0);
+ KeyValue[] input = new KeyValue[] { KV_B, KV_C, KV_C, KV_D, KV_E };
+ // Starts with one stripe; unlike flush results, must have metadata
+ KeyValue[][] expected = new KeyValue[][] { input };
+ verifyFlush(policy, si, input, expected, new byte[][] { OPEN_KEY, OPEN_KEY });
+ }
+
+ @Test
public void testSingleStripeCompaction() throws Exception {
// Create a special policy that only compacts single stripes, using standard methods.
Configuration conf = HBaseConfiguration.create();
@@ -424,6 +467,25 @@ public class TestStripeCompactionPolicy
dropDeletesMatcher(dropDeletes, start), dropDeletesMatcher(dropDeletes, end));
}
+ /** Verify arbitrary flush. */
+ protected void verifyFlush(StripeCompactionPolicy policy, StripeInformationProvider si,
+ KeyValue[] input, KeyValue[][] expected, byte[][] boundaries) throws IOException {
+ StoreFileWritersCapture writers = new StoreFileWritersCapture();
+ StripeStoreFlusher.StripeFlushRequest req = policy.selectFlush(si, input.length);
+ StripeMultiFileWriter mw = req.createWriter();
+ mw.init(null, writers, new KeyValue.KVComparator());
+ for (KeyValue kv : input) {
+ mw.append(kv);
+ }
+ boolean hasMetadata = boundaries != null;
+ mw.commitWriters(0, false);
+ writers.verifyKvs(expected, true, hasMetadata);
+ if (hasMetadata) {
+ writers.verifyBoundaries(boundaries);
+ }
+ }
+
+
private byte[] dropDeletesMatcher(Boolean dropDeletes, byte[] value) {
return dropDeletes == null ? any(byte[].class)
: (dropDeletes.booleanValue() ? aryEq(value) : isNull(byte[].class));