You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/02/21 20:18:26 UTC
svn commit: r1570674 - in /hbase/branches/0.89-fb/src:
main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
Author: liyin
Date: Fri Feb 21 19:18:26 2014
New Revision: 1570674
URL: http://svn.apache.org/r1570674
Log:
[HBASE-10578] [0.89-fb] Choose the correct KV from multiple scanners
Author: aaiyer
Summary:
When multiple scanners have the same KV, HBase should pick the "newest" one.
However, if one of the "current" scanner has the same KV, it seems to get
preference over the other scanners in the heap -- THIS IS WRONG.
This issue has been causing Relja problems. The diff adds a unit test to make
sure that bulk loads correctly. And fixes the issue.
Test Plan: Added a unit test
Reviewers: gauravm, liyintang, rshroff
Reviewed By: rshroff
CC: hbase-dev@, rpetrovic, daviddeng
Differential Revision: https://phabricator.fb.com/D1183013
Modified:
hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=1570674&r1=1570673&r2=1570674&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Fri Feb 21 19:18:26 2014
@@ -102,7 +102,7 @@ public class KeyValueHeap extends NonLaz
} else {
KeyValueScanner topScanner = this.heap.peek();
if (topScanner == null ||
- this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
+ this.comparator.compare(this.current, topScanner) > 0) {
this.heap.add(this.current);
this.current = pollRealKV();
}
@@ -381,7 +381,7 @@ public class KeyValueHeap extends NonLaz
// Compare the current scanner to the next scanner. We try to avoid
// putting the current one back into the heap if possible.
KeyValue nextKV = nextEarliestScanner.peek();
- if (nextKV == null || comparator.compare(curKV, nextKV) <= 0) {
+ if (nextKV == null || comparator.compare(kvScanner, nextEarliestScanner) <= 0) {
// We already have the scanner with the earliest KV, so return it.
return kvScanner;
}
Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java?rev=1570674&r1=1570673&r2=1570674&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java Fri Feb 21 19:18:26 2014
@@ -24,18 +24,20 @@ import static org.junit.Assert.assertTru
import java.io.IOException;
import java.util.List;
+import java.util.NavigableMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -43,8 +45,11 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.regionserver.StoreFile;
import org.apache.hadoop.hbase.regionserver.TestStore;
import org.apache.hadoop.hbase.regionserver.TestStoreFile;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
import org.junit.Test;
+import org.mortbay.log.Log;
/**
* Test cases for the "load" half of the HFileOutputFormat bulk load
@@ -154,6 +159,103 @@ public class TestLoadIncrementalHFiles {
}
}
+ @Test
+ public void testLoadWithSeqNumNoFlush() throws Exception {
+ testLoadWithSeqNum(false);
+ }
+
+ @Test
+ public void testLoadWithSeqNumWithFlush() throws Exception {
+ testLoadWithSeqNum(true);
+ }
+
+ private void testLoadWithSeqNum(boolean flush) throws Exception {
+ String testName = "bulkLoadSensibility";
+ Path dir1 = util.getTestDir(testName + "1");
+ Path dir2 = util.getTestDir(testName + "2");
+ FileSystem fs = util.getTestFileSystem();
+ dir1 = dir1.makeQualified(fs);
+ dir2 = dir2.makeQualified(fs);
+ Path familyDir1 = new Path(dir1, Bytes.toString(FAMILY));
+ Path familyDir2 = new Path(dir2, Bytes.toString(FAMILY));
+ byte[] row = Bytes.toBytes("key");
+ long timestamp = 1000;
+
+ int hfileIdx = 0;
+ Configuration config = util.getConfiguration();
+ Log.info("Creating first bulkloaded file");
+ createHFileForKV(config, fs, new Path(familyDir1, "hfile_"
+ + hfileIdx++), row, FAMILY, QUALIFIER, timestamp, Bytes.toBytes("bulkValue"));
+
+ util.startMiniCluster();
+ try {
+ HBaseAdmin admin = new HBaseAdmin(config);
+ HTableDescriptor htd = new HTableDescriptor(TABLE);
+ htd.addFamily(new HColumnDescriptor(FAMILY));
+ // Do not worry about splitting the keys
+ admin.createTable(htd);
+
+ HTable table = new HTable(config, TABLE);
+ Log.info("Waiting for HTable to be available");
+ util.waitTableAvailable(TABLE, 30000);
+
+ // Do a dummy put to increase the hlog sequence number
+ Log.info("Doing a htable.put()");
+ Put put = new Put(row);
+ put.add(FAMILY, QUALIFIER, timestamp, Bytes.toBytes("singlePutValue"));
+ table.put(put);
+
+ if (flush) {
+ // flush the table
+ Log.info("Flushing the table ");
+ table.flushRegionForRow(row, 0);
+ }
+
+ config.setBoolean(LoadIncrementalHFiles.ASSIGN_SEQ_IDS, false);
+ Log.info("Loading the first bulkload file");
+ LoadIncrementalHFiles loader1 = new LoadIncrementalHFiles(
+ config);
+ loader1.doBulkLoad(dir1, table);
+
+ Log.info("Getting after the first bulkloaded file");
+ Get get = new Get(row);
+ get.addColumn(FAMILY, QUALIFIER);
+ get.setTimeStamp(timestamp);
+
+ Result result = table.get(get);
+ NavigableMap<Long, byte[]> navigableMap =
+ result.getMap().get(FAMILY).get(QUALIFIER);
+ assertEquals("singlePutValue", Bytes.toString(navigableMap.get(timestamp)));
+
+ Log.info("Creating second bulkload file");
+ createHFileForKV(config, fs, new Path(familyDir2, "hfile_"
+ + hfileIdx++), row, FAMILY, QUALIFIER, timestamp, Bytes.toBytes("bulkValue2"));
+
+ config.setBoolean(LoadIncrementalHFiles.ASSIGN_SEQ_IDS, true);
+ LoadIncrementalHFiles loader2 = new LoadIncrementalHFiles(
+ config);
+
+ Log.info("Loading the second bulkload file");
+ loader2.doBulkLoad(dir2, table);
+
+ Log.info("Verifying get after 2nd bulk load");
+ result = table.get(get);
+ navigableMap =
+ result.getMap().get(FAMILY).get(QUALIFIER);
+ if (flush) {
+ // no value in memstore
+ // the latest bulk load file should win
+ assertEquals("bulkValue2", Bytes.toString(navigableMap.get(timestamp)));
+ } else {
+ // The value in memstore wins
+ assertEquals("singlePutValue", Bytes.toString(navigableMap.get(timestamp)));
+ }
+ } finally {
+ util.shutdownMiniCluster();
+ }
+
+ }
+
private void verifyAssignedSequenceNumber(String testName,
byte[][][] hfileRanges, boolean nonZero) throws Exception {
Path dir = util.getTestDir(testName);
@@ -262,16 +364,47 @@ public class TestLoadIncrementalHFiles {
.withCompression(COMPRESSION)
.withComparator(KeyValue.KEY_COMPARATOR)
.create();
+ TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
long now = System.currentTimeMillis();
try {
// subtract 2 since iterateOnSplits doesn't include boundary keys
for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) {
KeyValue kv = new KeyValue(key, family, qualifier, now, key);
writer.append(kv);
+ timeRangeTracker.includeTimestamp(kv);
}
} finally {
writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
Bytes.toBytes(System.currentTimeMillis()));
+ writer.appendFileInfo(StoreFile.TIMERANGE_KEY,
+ WritableUtils.toByteArray(timeRangeTracker));
+ writer.close();
+ }
+ }
+
+ static void createHFileForKV(
+ Configuration conf,
+ FileSystem fs, Path path,
+ byte[] key,
+ byte[] family, byte[] qualifier,
+ long timestamp, byte[] value) throws IOException
+ {
+ HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf))
+ .withPath(fs, path)
+ .withBlockSize(BLOCKSIZE)
+ .withCompression(COMPRESSION)
+ .withComparator(KeyValue.KEY_COMPARATOR)
+ .create();
+ TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
+ try {
+ KeyValue kv = new KeyValue(key, family, qualifier, timestamp, value);
+ writer.append(kv);
+ timeRangeTracker.includeTimestamp(kv);
+ } finally {
+ writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+ Bytes.toBytes(System.currentTimeMillis()));
+ writer.appendFileInfo(StoreFile.TIMERANGE_KEY,
+ WritableUtils.toByteArray(timeRangeTracker));
writer.close();
}
}