You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2014/04/25 23:03:12 UTC
svn commit: r1590144 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/mapreduce/
test/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/wal/ te...
Author: jdcryans
Date: Fri Apr 25 21:03:11 2014
New Revision: 1590144
URL: http://svn.apache.org/r1590144
Log:
HBASE-10958 [dataloss] Bulk loading with seqids can prevent some log entries from being replayed
Added:
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileTestUtil.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1590144&r1=1590143&r2=1590144&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Fri Apr 25 21:03:11 2014
@@ -387,6 +387,81 @@ public class HRegion implements HeapSize
ClassSize.OBJECT + 5 * Bytes.SIZEOF_BOOLEAN);
}
+ /**
+ * Objects from this class are created when flushing to describe all the different states that
+ * that method ends up in. The Result enum describes those states. The sequence id should only
+ * be specified if the flush was successful, and the failure message should only be speficied
+ * if it didn't flush.
+ */
+ public static class FlushResult {
+ enum Result {
+ FLUSHED_NO_COMPACTION_NEEDED,
+ FLUSHED_COMPACTION_NEEDED,
+ // Special case where a flush didn't run because there's nothing in the memstores. Used when
+ // bulk loading to know when we can still load even if a flush didn't happen.
+ CANNOT_FLUSH_MEMSTORE_EMPTY,
+ CANNOT_FLUSH
+ // Be careful adding more to this enum, look at the below methods to make sure
+ }
+
+ final Result result;
+ final String failureReason;
+ final long flushSequenceId;
+
+ /**
+ * Convenience constructor to use when the flush is successful, the failure message is set to
+ * null.
+ * @param result Expecting FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_COMPACTION_NEEDED.
+ * @param flushSequenceId Generated sequence id that comes right after the edits in the
+ * memstores.
+ */
+ FlushResult(Result result, long flushSequenceId) {
+ this(result, flushSequenceId, null);
+ assert result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
+ .FLUSHED_COMPACTION_NEEDED;
+ }
+
+ /**
+ * Convenience constructor to use when we cannot flush.
+ * @param result Expecting CANNOT_FLUSH_MEMSTORE_EMPTY or CANNOT_FLUSH.
+ * @param failureReason Reason why we couldn't flush.
+ */
+ FlushResult(Result result, String failureReason) {
+ this(result, -1, failureReason);
+ assert result == Result.CANNOT_FLUSH_MEMSTORE_EMPTY || result == Result.CANNOT_FLUSH;
+ }
+
+ /**
+ * Constructor with all the parameters.
+ * @param result Any of the Result.
+ * @param flushSequenceId Generated sequence id if the memstores were flushed else -1.
+ * @param failureReason Reason why we couldn't flush, or null.
+ */
+ FlushResult(Result result, long flushSequenceId, String failureReason) {
+ this.result = result;
+ this.flushSequenceId = flushSequenceId;
+ this.failureReason = failureReason;
+ }
+
+ /**
+ * Convenience method, the equivalent of checking if result is
+ * FLUSHED_NO_COMPACTION_NEEDED or FLUSHED_NO_COMPACTION_NEEDED.
+ * @return true if the memstores were flushed, else false.
+ */
+ public boolean isFlushSucceeded() {
+ return result == Result.FLUSHED_NO_COMPACTION_NEEDED || result == Result
+ .FLUSHED_COMPACTION_NEEDED;
+ }
+
+ /**
+ * Convenience method, the equivalent of checking if result is FLUSHED_COMPACTION_NEEDED.
+ * @return True if the flush requested a compaction, else false (doesn't even mean it flushed).
+ */
+ public boolean isCompactionNeeded() {
+ return result == Result.FLUSHED_COMPACTION_NEEDED;
+ }
+ }
+
final WriteState writestate = new WriteState();
long memstoreFlushSize;
@@ -706,16 +781,13 @@ public class HRegion implements HeapSize
for (int i = 0; i < htableDescriptor.getFamilies().size(); i++) {
Future<HStore> future = completionService.take();
HStore store = future.get();
-
this.stores.put(store.getColumnFamilyName().getBytes(), store);
- // Do not include bulk loaded files when determining seqIdForReplay
- long storeSeqIdForReplay = store.getMaxSequenceId(false);
+
+ long storeMaxSequenceId = store.getMaxSequenceId();
maxSeqIdInStores.put(store.getColumnFamilyName().getBytes(),
- storeSeqIdForReplay);
- // Include bulk loaded files when determining seqIdForAssignment
- long storeSeqIdForAssignment = store.getMaxSequenceId(true);
- if (maxSeqId == -1 || storeSeqIdForAssignment > maxSeqId) {
- maxSeqId = storeSeqIdForAssignment;
+ storeMaxSequenceId);
+ if (maxSeqId == -1 || storeMaxSequenceId > maxSeqId) {
+ maxSeqId = storeMaxSequenceId;
}
long maxStoreMemstoreTS = store.getMaxMemstoreTS();
if (maxStoreMemstoreTS > maxMemstoreTS) {
@@ -1461,11 +1533,12 @@ public class HRegion implements HeapSize
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
- public boolean flushcache() throws IOException {
+ public FlushResult flushcache() throws IOException {
// fail-fast instead of waiting on the lock
if (this.closing.get()) {
- LOG.debug("Skipping flush on " + this + " because closing");
- return false;
+ String msg = "Skipping flush on " + this + " because closing";
+ LOG.debug(msg);
+ return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
}
MonitoredTask status = TaskMonitor.get().createStatus("Flushing " + this);
status.setStatus("Acquiring readlock on region");
@@ -1473,9 +1546,10 @@ public class HRegion implements HeapSize
lock.readLock().lock();
try {
if (this.closed.get()) {
- LOG.debug("Skipping flush on " + this + " because closed");
- status.abort("Skipped: closed");
- return false;
+ String msg = "Skipping flush on " + this + " because closed";
+ LOG.debug(msg);
+ status.abort(msg);
+ return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
}
if (coprocessorHost != null) {
status.setStatus("Running coprocessor pre-flush hooks");
@@ -1494,14 +1568,15 @@ public class HRegion implements HeapSize
+ ", flushing=" + writestate.flushing + ", writesEnabled="
+ writestate.writesEnabled);
}
- status.abort("Not flushing since "
+ String msg = "Not flushing since "
+ (writestate.flushing ? "already flushing"
- : "writes not enabled"));
- return false;
+ : "writes not enabled");
+ status.abort(msg);
+ return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
}
}
try {
- boolean result = internalFlushcache(status);
+ FlushResult fs = internalFlushcache(status);
if (coprocessorHost != null) {
status.setStatus("Running post-flush coprocessor hooks");
@@ -1509,7 +1584,7 @@ public class HRegion implements HeapSize
}
status.markComplete("Flush successful");
- return result;
+ return fs;
} finally {
synchronized (writestate) {
writestate.flushing = false;
@@ -1578,13 +1653,13 @@ public class HRegion implements HeapSize
* <p> This method may block for some time.
* @param status
*
- * @return true if the region needs compacting
+ * @return object describing the flush's state
*
* @throws IOException general io exceptions
* @throws DroppedSnapshotException Thrown when replay of hlog is required
* because a Snapshot was not properly persisted.
*/
- protected boolean internalFlushcache(MonitoredTask status)
+ protected FlushResult internalFlushcache(MonitoredTask status)
throws IOException {
return internalFlushcache(this.log, -1, status);
}
@@ -1598,7 +1673,7 @@ public class HRegion implements HeapSize
* @throws IOException
* @see #internalFlushcache(MonitoredTask)
*/
- protected boolean internalFlushcache(
+ protected FlushResult internalFlushcache(
final HLog wal, final long myseqid, MonitoredTask status)
throws IOException {
if (this.rsServices != null && this.rsServices.isAborted()) {
@@ -1609,7 +1684,7 @@ public class HRegion implements HeapSize
// Clear flush flag.
// If nothing to flush, return and avoid logging start/stop flush.
if (this.memstoreSize.get() <= 0) {
- return false;
+ return new FlushResult(FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY, "Nothing to flush");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Started memstore flush for " + this +
@@ -1644,9 +1719,10 @@ public class HRegion implements HeapSize
// check if it is not closing.
if (wal != null) {
if (!wal.startCacheFlush(this.getRegionInfo().getEncodedNameAsBytes())) {
- status.setStatus("Flush will not be started for ["
- + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.");
- return false;
+ String msg = "Flush will not be started for ["
+ + this.getRegionInfo().getEncodedName() + "] - because the WAL is closing.";
+ status.setStatus(msg);
+ return new FlushResult(FlushResult.Result.CANNOT_FLUSH, msg);
}
flushSeqId = this.sequenceId.incrementAndGet();
} else {
@@ -1762,7 +1838,8 @@ public class HRegion implements HeapSize
status.setStatus(msg);
this.recentFlushes.add(new Pair<Long,Long>(time/1000, totalFlushableSize));
- return compactionRequested;
+ return new FlushResult(compactionRequested ? FlushResult.Result.FLUSHED_COMPACTION_NEEDED :
+ FlushResult.Result.FLUSHED_NO_COMPACTION_NEEDED, flushSeqId);
}
//////////////////////////////////////////////////////////////////////////////
@@ -3563,6 +3640,22 @@ public class HRegion implements HeapSize
return false;
}
+ long seqId = -1;
+ // We need to assign a sequential ID that's in between two memstores in order to preserve
+ // the guarantee that all the edits lower than the highest sequential ID from all the
+ // HFiles are flushed on disk. See HBASE-10958.
+ if (assignSeqId) {
+ FlushResult fs = this.flushcache();
+ if (fs.isFlushSucceeded()) {
+ seqId = fs.flushSequenceId;
+ } else if (fs.result == FlushResult.Result.CANNOT_FLUSH_MEMSTORE_EMPTY) {
+ seqId = this.sequenceId.incrementAndGet();
+ } else {
+ throw new IOException("Could not bulk load with an assigned sequential ID because the " +
+ "flush didn't run. Reason for not flushing: " + fs.failureReason);
+ }
+ }
+
for (Pair<byte[], String> p : familyPaths) {
byte[] familyName = p.getFirst();
String path = p.getSecond();
@@ -3572,7 +3665,7 @@ public class HRegion implements HeapSize
if(bulkLoadListener != null) {
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path);
}
- store.bulkLoadHFile(finalPath, assignSeqId ? this.sequenceId.incrementAndGet() : -1);
+ store.bulkLoadHFile(finalPath, seqId);
if(bulkLoadListener != null) {
bulkLoadListener.doneBulkLoad(familyName, path);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java?rev=1590144&r1=1590143&r2=1590144&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java Fri Apr 25 21:03:11 2014
@@ -433,8 +433,8 @@ public class HStore implements Store {
/**
* @return The maximum sequence id in all store files. Used for log replay.
*/
- long getMaxSequenceId(boolean includeBulkFiles) {
- return StoreFile.getMaxSequenceIdInList(this.getStorefiles(), includeBulkFiles);
+ long getMaxSequenceId() {
+ return StoreFile.getMaxSequenceIdInList(this.getStorefiles());
}
@Override
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java?rev=1590144&r1=1590143&r2=1590144&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MemStoreFlusher.java Fri Apr 25 21:03:11 2014
@@ -475,7 +475,7 @@ class MemStoreFlusher implements FlushRe
lock.readLock().lock();
try {
notifyFlushRequest(region, emergencyFlush);
- boolean shouldCompact = region.flushcache();
+ boolean shouldCompact = region.flushcache().isCompactionNeeded();
// We just want to check the size
boolean shouldSplit = region.checkSplit() != null;
if (shouldSplit) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java?rev=1590144&r1=1590143&r2=1590144&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java Fri Apr 25 21:03:11 2014
@@ -978,7 +978,7 @@ public class RSRpcServices implements HB
}
FlushRegionResponse.Builder builder = FlushRegionResponse.newBuilder();
if (shouldFlush) {
- boolean result = region.flushcache();
+ boolean result = region.flushcache().isCompactionNeeded();
if (result) {
regionServer.compactSplitThread.requestSystemCompaction(region,
"Compaction through user triggered flush");
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1590144&r1=1590143&r2=1590144&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Fri Apr 25 21:03:11 2014
@@ -300,21 +300,15 @@ public class StoreFile {
/**
* Return the highest sequence ID found across all storefiles in
- * the given list. Store files that were created by a mapreduce
- * bulk load are ignored, as they do not correspond to any edit
- * log items.
+ * the given list.
* @param sfs
- * @param includeBulkLoadedFiles
* @return 0 if no non-bulk-load files are provided or, this is Store that
* does not yet have any store files.
*/
- public static long getMaxSequenceIdInList(Collection<StoreFile> sfs,
- boolean includeBulkLoadedFiles) {
+ public static long getMaxSequenceIdInList(Collection<StoreFile> sfs) {
long max = 0;
for (StoreFile sf : sfs) {
- if (includeBulkLoadedFiles || !sf.isBulkLoadResult()) {
- max = Math.max(max, sf.getMaxSequenceId());
- }
+ max = Math.max(max, sf.getMaxSequenceId());
}
return max;
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java?rev=1590144&r1=1590143&r2=1590144&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java Fri Apr 25 21:03:11 2014
@@ -32,21 +32,16 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
-import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.io.compress.Compression;
-import org.apache.hadoop.hbase.io.compress.Compression.Algorithm;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -69,10 +64,6 @@ public class TestLoadIncrementalHFiles {
Bytes.toBytes("ppp")
};
- public static int BLOCKSIZE = 64*1024;
- public static Algorithm COMPRESSION =
- Compression.Algorithm.NONE;
-
static HBaseTestingUtility util = new HBaseTestingUtility();
@BeforeClass
@@ -149,7 +140,7 @@ public class TestLoadIncrementalHFiles {
for (byte[][] range : hfileRanges) {
byte[] from = range[0];
byte[] to = range[1];
- createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ hfileIdx++), FAMILY, QUALIFIER, from, to, 1000);
}
int expectedRows = hfileIdx * 1000;
@@ -189,7 +180,7 @@ public class TestLoadIncrementalHFiles {
for (byte[][] range : hFileRanges) {
byte[] from = range[0];
byte[] to = range[1];
- createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ hFileIdx++), FAMILY, QUALIFIER, from, to, 1000);
}
@@ -228,7 +219,7 @@ public class TestLoadIncrementalHFiles {
FileSystem fs = util.getTestFileSystem();
Path testIn = new Path(dir, "testhfile");
HColumnDescriptor familyDesc = new HColumnDescriptor(FAMILY);
- createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
+ HFileTestUtil.createHFile(util.getConfiguration(), fs, testIn, FAMILY, QUALIFIER,
Bytes.toBytes("aaa"), Bytes.toBytes("zzz"), 1000);
Path bottomOut = new Path(dir, "bottom.out");
@@ -261,40 +252,6 @@ public class TestLoadIncrementalHFiles {
return count;
}
-
- /**
- * Create an HFile with the given number of rows between a given
- * start key and end key.
- * TODO put me in an HFileTestUtil or something?
- */
- static void createHFile(
- Configuration configuration,
- FileSystem fs, Path path,
- byte[] family, byte[] qualifier,
- byte[] startKey, byte[] endKey, int numRows) throws IOException
- {
- HFileContext meta = new HFileContextBuilder()
- .withBlockSize(BLOCKSIZE)
- .withCompression(COMPRESSION)
- .build();
- HFile.Writer writer = HFile.getWriterFactory(configuration, new CacheConfig(configuration))
- .withPath(fs, path)
- .withFileContext(meta)
- .create();
- long now = System.currentTimeMillis();
- try {
- // subtract 2 since iterateOnSplits doesn't include boundary keys
- for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) {
- KeyValue kv = new KeyValue(key, family, qualifier, now, key);
- writer.append(kv);
- }
- } finally {
- writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
- Bytes.toBytes(System.currentTimeMillis()));
- writer.close();
- }
- }
-
private void addStartEndKeysForTest(TreeMap<byte[], Integer> map, byte[] first, byte[] last) {
Integer value = map.containsKey(first)?map.get(first):0;
map.put(first, value+1);
@@ -370,7 +327,7 @@ public class TestLoadIncrementalHFiles {
byte[] from = Bytes.toBytes("begin");
byte[] to = Bytes.toBytes("end");
for (int i = 0; i <= MAX_FILES_PER_REGION_PER_FAMILY; i++) {
- createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ HFileTestUtil.createHFile(util.getConfiguration(), fs, new Path(familyDir, "hfile_"
+ i), FAMILY, QUALIFIER, from, to, 1000);
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java?rev=1590144&r1=1590143&r2=1590144&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java Fri Apr 25 21:03:11 2014
@@ -3435,7 +3435,7 @@ public class TestHRegion {
@Override
public void doAnAction() throws Exception {
- if (region.flushcache()) {
+ if (region.flushcache().isCompactionNeeded()) {
++flushesSinceCompact;
}
// Compact regularly to avoid creating too many files and exceeding
@@ -4292,6 +4292,42 @@ public class TestHRegion {
}
}
+ /**
+ * Test that we get the expected flush results back
+ * @throws IOException
+ */
+ @Test
+ public void testFlushResult() throws IOException {
+ String method = name.getMethodName();
+ byte[] tableName = Bytes.toBytes(method);
+ byte[] family = Bytes.toBytes("family");
+
+ this.region = initHRegion(tableName, method, family);
+
+ // empty memstore, flush doesn't run
+ HRegion.FlushResult fr = region.flushcache();
+ assertFalse(fr.isFlushSucceeded());
+ assertFalse(fr.isCompactionNeeded());
+
+ // Flush enough files to get up to the threshold, doesn't need compactions
+ for (int i = 0; i < 2; i++) {
+ Put put = new Put(tableName).add(family, family, tableName);
+ region.put(put);
+ fr = region.flushcache();
+ assertTrue(fr.isFlushSucceeded());
+ assertFalse(fr.isCompactionNeeded());
+ }
+
+ // Two flushes after the threshold, compactions are needed
+ for (int i = 0; i < 2; i++) {
+ Put put = new Put(tableName).add(family, family, tableName);
+ region.put(put);
+ fr = region.flushcache();
+ assertTrue(fr.isFlushSucceeded());
+ assertTrue(fr.isCompactionNeeded());
+ }
+ }
+
private Configuration initSplit() {
// Always compact if there is more than one store file.
CONF.setInt("hbase.hstore.compactionThreshold", 2);
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java?rev=1590144&r1=1590143&r2=1590144&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplay.java Fri Apr 25 21:03:11 2014
@@ -57,9 +57,6 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.io.hfile.HFile;
-import org.apache.hadoop.hbase.io.hfile.HFileContext;
-import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
import org.apache.hadoop.hbase.master.HMaster;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.regionserver.DefaultStoreEngine;
@@ -77,6 +74,7 @@ import org.apache.hadoop.hbase.util.Byte
import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.HFileTestUtil;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
import org.junit.AfterClass;
@@ -319,7 +317,7 @@ public class TestWALReplay {
throws IOException, SecurityException, IllegalArgumentException,
NoSuchFieldException, IllegalAccessException, InterruptedException {
final TableName tableName =
- TableName.valueOf("testReplayEditsWrittenViaHRegion");
+ TableName.valueOf("testRegionMadeOfBulkLoadedFilesOnly");
final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
deleteDir(basedir);
@@ -329,21 +327,22 @@ public class TestWALReplay {
HRegion.closeHRegion(region2);
HLog wal = createWAL(this.conf);
HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
- Path f = new Path(basedir, "hfile");
- HFileContext context = new HFileContextBuilder().build();
- HFile.Writer writer =
- HFile.getWriterFactoryNoCache(conf).withPath(fs, f)
- .withFileContext(context).create();
+
byte [] family = htd.getFamilies().iterator().next().getName();
- byte [] row = tableName.getName();
- writer.append(new KeyValue(row, family, family, row));
- writer.close();
+ Path f = new Path(basedir, "hfile");
+ HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(""),
+ Bytes.toBytes("z"), 10);
List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1);
hfs.add(Pair.newPair(family, f.toString()));
region.bulkLoadHFiles(hfs, true);
+
// Add an edit so something in the WAL
+ byte [] row = tableName.getName();
region.put((new Put(row)).add(family, family, family));
wal.sync();
+ final int rowsInsertedCount = 11;
+
+ assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
// Now 'crash' the region by stealing its wal
final Configuration newConf = HBaseConfiguration.create(this.conf);
@@ -358,6 +357,7 @@ public class TestWALReplay {
hbaseRootDir, hri, htd, wal2);
long seqid2 = region2.getOpenSeqNum();
assertTrue(seqid2 > -1);
+ assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
// I can't close wal1. Its been appropriated when we split.
region2.close();
@@ -368,6 +368,78 @@ public class TestWALReplay {
}
/**
+ * HRegion test case that is made of a major compacted HFile (created with three bulk loaded
+ * files) and an edit in the memstore.
+ * This is for HBASE-10958 "[dataloss] Bulk loading with seqids can prevent some log entries
+ * from being replayed"
+ * @throws IOException
+ * @throws IllegalAccessException
+ * @throws NoSuchFieldException
+ * @throws IllegalArgumentException
+ * @throws SecurityException
+ */
+ @Test
+ public void testCompactedBulkLoadedFiles()
+ throws IOException, SecurityException, IllegalArgumentException,
+ NoSuchFieldException, IllegalAccessException, InterruptedException {
+ final TableName tableName =
+ TableName.valueOf("testCompactedBulkLoadedFiles");
+ final HRegionInfo hri = createBasic3FamilyHRegionInfo(tableName);
+ final Path basedir = new Path(this.hbaseRootDir, tableName.getNameAsString());
+ deleteDir(basedir);
+ final HTableDescriptor htd = createBasic3FamilyHTD(tableName);
+ HRegion region2 = HRegion.createHRegion(hri,
+ hbaseRootDir, this.conf, htd);
+ HRegion.closeHRegion(region2);
+ HLog wal = createWAL(this.conf);
+ HRegion region = HRegion.openHRegion(hri, htd, wal, this.conf);
+
+ // Add an edit so something in the WAL
+ byte [] row = tableName.getName();
+ byte [] family = htd.getFamilies().iterator().next().getName();
+ region.put((new Put(row)).add(family, family, family));
+ wal.sync();
+
+ List <Pair<byte[],String>> hfs= new ArrayList<Pair<byte[],String>>(1);
+ for (int i = 0; i < 3; i++) {
+ Path f = new Path(basedir, "hfile"+i);
+ HFileTestUtil.createHFile(this.conf, fs, f, family, family, Bytes.toBytes(i + "00"),
+ Bytes.toBytes(i + "50"), 10);
+ hfs.add(Pair.newPair(family, f.toString()));
+ }
+ region.bulkLoadHFiles(hfs, true);
+ final int rowsInsertedCount = 31;
+ assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
+
+ // major compact to turn all the bulk loaded files into one normal file
+ region.compactStores(true);
+ assertEquals(rowsInsertedCount, getScannedCount(region.getScanner(new Scan())));
+
+ // Now 'crash' the region by stealing its wal
+ final Configuration newConf = HBaseConfiguration.create(this.conf);
+ User user = HBaseTestingUtility.getDifferentUser(newConf,
+ tableName.getNameAsString());
+ user.runAs(new PrivilegedExceptionAction() {
+ public Object run() throws Exception {
+ runWALSplit(newConf);
+ HLog wal2 = createWAL(newConf);
+
+ HRegion region2 = HRegion.openHRegion(newConf, FileSystem.get(newConf),
+ hbaseRootDir, hri, htd, wal2);
+ long seqid2 = region2.getOpenSeqNum();
+ assertTrue(seqid2 > -1);
+ assertEquals(rowsInsertedCount, getScannedCount(region2.getScanner(new Scan())));
+
+ // I can't close wal1. Its been appropriated when we split.
+ region2.close();
+ wal2.closeAndDelete();
+ return null;
+ }
+ });
+ }
+
+
+ /**
* Test writing edits into an HRegion, closing it, splitting logs, opening
* Region again. Verify seqids.
* @throws IOException
@@ -736,14 +808,14 @@ public class TestWALReplay {
try {
final HRegion region =
new HRegion(basedir, newWal, newFS, newConf, hri, htd, null) {
- protected boolean internalFlushcache(
+ protected FlushResult internalFlushcache(
final HLog wal, final long myseqid, MonitoredTask status)
throws IOException {
LOG.info("InternalFlushCache Invoked");
- boolean b = super.internalFlushcache(wal, myseqid,
+ FlushResult fs = super.internalFlushcache(wal, myseqid,
Mockito.mock(MonitoredTask.class));
flushcount.incrementAndGet();
- return b;
+ return fs;
};
};
long seqid = region.initialize();
Added: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileTestUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileTestUtil.java?rev=1590144&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileTestUtil.java (added)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/util/HFileTestUtil.java Fri Apr 25 21:03:11 2014
@@ -0,0 +1,66 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.util;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
+
+import java.io.IOException;
+
+/**
+ * Utility class for HFile-related testing.
+ */
+public class HFileTestUtil {
+
+ /**
+ * Create an HFile with the given number of rows between a given
+ * start key and end key.
+ */
+ public static void createHFile(
+ Configuration configuration,
+ FileSystem fs, Path path,
+ byte[] family, byte[] qualifier,
+ byte[] startKey, byte[] endKey, int numRows) throws IOException
+ {
+ HFileContext meta = new HFileContextBuilder().build();
+ HFile.Writer writer = HFile.getWriterFactory(configuration, new CacheConfig(configuration))
+ .withPath(fs, path)
+ .withFileContext(meta)
+ .create();
+ long now = System.currentTimeMillis();
+ try {
+ // subtract 2 since iterateOnSplits doesn't include boundary keys
+ for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) {
+ KeyValue kv = new KeyValue(key, family, qualifier, now, key);
+ writer.append(kv);
+ }
+ } finally {
+ writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+ Bytes.toBytes(System.currentTimeMillis()));
+ writer.close();
+ }
+ }
+}