You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2014/09/08 16:49:27 UTC
git commit: HBASE-11772 Bulk load mvcc and seqId issues with native
hfiles (Jerry He)
Repository: hbase
Updated Branches:
refs/heads/branch-1 ecb015d9a -> fd5b139a6
HBASE-11772 Bulk load mvcc and seqId issues with native hfiles (Jerry He)
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fd5b139a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fd5b139a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fd5b139a
Branch: refs/heads/branch-1
Commit: fd5b139a6f60970f575499d5dfb0aa762c590696
Parents: ecb015d
Author: Ted Yu <te...@apache.org>
Authored: Mon Sep 8 14:49:16 2014 +0000
Committer: Ted Yu <te...@apache.org>
Committed: Mon Sep 8 14:49:16 2014 +0000
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/StoreFile.java | 24 +++++--
.../hbase/regionserver/StoreFileScanner.java | 2 +-
.../regionserver/TestScannerWithBulkload.java | 76 ++++++++++++++++++--
3 files changed, 91 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/fd5b139a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 27c64f0..6a45c47 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -315,18 +315,31 @@ public class StoreFile {
}
/**
- * @return true if this storefile was created by HFileOutputFormat
- * for a bulk load.
+ * Check if this storefile was created by bulk load.
+ * When a hfile is bulk loaded into HBase, we append
+ * '_SeqId_<id-when-loaded>' to the hfile name, unless
+ * "hbase.mapreduce.bulkload.assign.sequenceNumbers" is
+ * explicitly turned off.
+ * If "hbase.mapreduce.bulkload.assign.sequenceNumbers"
+ * is turned off, fall back to BULKLOAD_TIME_KEY.
+ * @return true if this storefile was created by bulk load.
*/
boolean isBulkLoadResult() {
- return metadataMap.containsKey(BULKLOAD_TIME_KEY);
+ boolean bulkLoadedHFile = false;
+ String fileName = this.getPath().getName();
+ int startPos = fileName.indexOf("SeqId_");
+ if (startPos != -1) {
+ bulkLoadedHFile = true;
+ }
+ return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
}
/**
* Return the timestamp at which this bulk load file was generated.
*/
public long getBulkLoadTimestamp() {
- return Bytes.toLong(metadataMap.get(BULKLOAD_TIME_KEY));
+ byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
+ return (bulkLoadTimestamp == null) ? 0 : Bytes.toLong(bulkLoadTimestamp);
}
/**
@@ -372,7 +385,8 @@ public class StoreFile {
// generate the sequenceId from the fileName
// fileName is of the form <randomName>_SeqId_<id-when-loaded>_
String fileName = this.getPath().getName();
- int startPos = fileName.indexOf("SeqId_");
+ // Use lastIndexOf() to get the last, most recent bulk load seqId.
+ int startPos = fileName.lastIndexOf("SeqId_");
if (startPos != -1) {
this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
fileName.indexOf('_', startPos + 6)));
http://git-wip-us.apache.org/repos/asf/hbase/blob/fd5b139a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index aa351d3..2784845 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -200,7 +200,7 @@ public class StoreFileScanner implements KeyValueScanner {
protected void setCurrentCell(Cell newVal) throws IOException {
this.cur = newVal;
- if (this.cur != null && this.reader.isBulkLoaded() && cur.getSequenceId() <= 0) {
+ if (this.cur != null && this.reader.isBulkLoaded()) {
CellUtil.setSequenceId(cur, this.reader.getSequenceID());
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/fd5b139a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
index 3ff6394..af4b9c5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
@@ -73,7 +73,8 @@ public class TestScannerWithBulkload {
Scan scan = createScan();
final HTable table = init(admin, l, scan, tableName);
// use bulkload
- final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file");
+ final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file",
+ false);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
@@ -101,6 +102,7 @@ public class TestScannerWithBulkload {
}
result = scanner.next();
}
+ scanner.close();
table.close();
}
@@ -121,7 +123,10 @@ public class TestScannerWithBulkload {
return result;
}
- private Path writeToHFile(long l, String hFilePath, String pathStr) throws IOException {
+ // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file.
+ // Else, we will set BULKLOAD_TIME_KEY.
+ private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile)
+ throws IOException {
FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
final Path hfilePath = new Path(hFilePath);
fs.mkdirs(hfilePath);
@@ -132,10 +137,26 @@ public class TestScannerWithBulkload {
HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create();
KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
Bytes.toBytes("version2"));
+
+ // Set cell seq id to test bulk load native hfiles.
+ if (nativeHFile) {
+ // Set a big seq id. Scan should not look at this seq id in a bulk loaded file.
+ // Scan should only look at the seq id appended at the bulk load time, and not skip
+ // this kv.
+ kv.setSequenceId(9999999);
+ }
+
writer.append(kv);
- // Add the bulk load time_key. otherwise we cannot ensure that it is a bulk
- // loaded file
+
+ if (nativeHFile) {
+ // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file.
+ // Scan should only look at the seq id appended at the bulk load time, and not skip its
+ // kv.
+ writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999)));
+ }
+ else {
writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
+ }
writer.close();
return hfilePath;
}
@@ -182,7 +203,7 @@ public class TestScannerWithBulkload {
final HTable table = init(admin, l, scan, tableName);
// use bulkload
final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/",
- "/temp/testBulkLoadWithParallelScan/col/file");
+ "/temp/testBulkLoadWithParallelScan/col/file", false);
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
@@ -209,10 +230,55 @@ public class TestScannerWithBulkload {
// scanner
Result result = scanner.next();
scanAfterBulkLoad(scanner, result, "version1");
+ scanner.close();
table.close();
}
+ @Test
+ public void testBulkLoadNativeHFile() throws Exception {
+ String tableName = "testBulkLoadNativeHFile";
+ long l = System.currentTimeMillis();
+ HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+ createTable(admin, tableName);
+ Scan scan = createScan();
+ final HTable table = init(admin, l, scan, tableName);
+ // use bulkload
+ final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/",
+ "/temp/testBulkLoadNativeHFile/col/file", true);
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
+ final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
+ bulkload.doBulkLoad(hfilePath, table);
+ ResultScanner scanner = table.getScanner(scan);
+ Result result = scanner.next();
+ // We had 'version0', 'version1' for 'row1,col:q' in the table.
+ // Bulk load added 'version2' scanner should be able to see 'version2'
+ result = scanAfterBulkLoad(scanner, result, "version2");
+ Put put0 = new Put(Bytes.toBytes("row1"));
+ put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
+ .toBytes("version3")));
+ table.put(put0);
+ table.flushCommits();
+ admin.flush(tableName);
+ scanner = table.getScanner(scan);
+ result = scanner.next();
+ while (result != null) {
+ List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
+ for (KeyValue _kv : kvs) {
+ if (Bytes.toString(_kv.getRow()).equals("row1")) {
+ System.out.println(Bytes.toString(_kv.getRow()));
+ System.out.println(Bytes.toString(_kv.getQualifier()));
+ System.out.println(Bytes.toString(_kv.getValue()));
+ Assert.assertEquals("version3", Bytes.toString(_kv.getValue()));
+ }
+ }
+ result = scanner.next();
+ }
+ scanner.close();
+ table.close();
+ }
+
private Scan createScan() {
Scan scan = new Scan();
scan.setMaxVersions(3);