You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2014/09/08 16:49:27 UTC

git commit: HBASE-11772 Bulk load mvcc and seqId issues with native hfiles (Jerry He)

Repository: hbase
Updated Branches:
  refs/heads/branch-1 ecb015d9a -> fd5b139a6


HBASE-11772 Bulk load mvcc and seqId issues with native hfiles (Jerry He)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/fd5b139a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/fd5b139a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/fd5b139a

Branch: refs/heads/branch-1
Commit: fd5b139a6f60970f575499d5dfb0aa762c590696
Parents: ecb015d
Author: Ted Yu <te...@apache.org>
Authored: Mon Sep 8 14:49:16 2014 +0000
Committer: Ted Yu <te...@apache.org>
Committed: Mon Sep 8 14:49:16 2014 +0000

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/StoreFile.java    | 24 +++++--
 .../hbase/regionserver/StoreFileScanner.java    |  2 +-
 .../regionserver/TestScannerWithBulkload.java   | 76 ++++++++++++++++++--
 3 files changed, 91 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/fd5b139a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 27c64f0..6a45c47 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -315,18 +315,31 @@ public class StoreFile {
   }
 
   /**
-   * @return true if this storefile was created by HFileOutputFormat
-   * for a bulk load.
+   * Check if this storefile was created by bulk load.
+   * When a hfile is bulk loaded into HBase, we append
+   * '_SeqId_<id-when-loaded>' to the hfile name, unless
+   * "hbase.mapreduce.bulkload.assign.sequenceNumbers" is
+   * explicitly turned off.
+   * If "hbase.mapreduce.bulkload.assign.sequenceNumbers"
+   * is turned off, fall back to BULKLOAD_TIME_KEY.
+   * @return true if this storefile was created by bulk load.
    */
   boolean isBulkLoadResult() {
-    return metadataMap.containsKey(BULKLOAD_TIME_KEY);
+    boolean bulkLoadedHFile = false;
+    String fileName = this.getPath().getName();
+    int startPos = fileName.indexOf("SeqId_");
+    if (startPos != -1) {
+      bulkLoadedHFile = true;
+    }
+    return bulkLoadedHFile || metadataMap.containsKey(BULKLOAD_TIME_KEY);
   }
 
   /**
    * Return the timestamp at which this bulk load file was generated.
    */
   public long getBulkLoadTimestamp() {
-    return Bytes.toLong(metadataMap.get(BULKLOAD_TIME_KEY));
+    byte[] bulkLoadTimestamp = metadataMap.get(BULKLOAD_TIME_KEY);
+    return (bulkLoadTimestamp == null) ? 0 : Bytes.toLong(bulkLoadTimestamp);
   }
 
   /**
@@ -372,7 +385,8 @@ public class StoreFile {
       // generate the sequenceId from the fileName
       // fileName is of the form <randomName>_SeqId_<id-when-loaded>_
       String fileName = this.getPath().getName();
-      int startPos = fileName.indexOf("SeqId_");
+      // Use lastIndexOf() to get the last, most recent bulk load seqId.
+      int startPos = fileName.lastIndexOf("SeqId_");
       if (startPos != -1) {
         this.sequenceid = Long.parseLong(fileName.substring(startPos + 6,
             fileName.indexOf('_', startPos + 6)));

http://git-wip-us.apache.org/repos/asf/hbase/blob/fd5b139a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index aa351d3..2784845 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -200,7 +200,7 @@ public class StoreFileScanner implements KeyValueScanner {
 
   protected void setCurrentCell(Cell newVal) throws IOException {
     this.cur = newVal;
-    if (this.cur != null && this.reader.isBulkLoaded() && cur.getSequenceId() <= 0) {
+    if (this.cur != null && this.reader.isBulkLoaded()) {
       CellUtil.setSequenceId(cur, this.reader.getSequenceID());
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/fd5b139a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
index 3ff6394..af4b9c5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerWithBulkload.java
@@ -73,7 +73,8 @@ public class TestScannerWithBulkload {
     Scan scan = createScan();
     final HTable table = init(admin, l, scan, tableName);
     // use bulkload
-    final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file");
+    final Path hfilePath = writeToHFile(l, "/temp/testBulkLoad/", "/temp/testBulkLoad/col/file",
+      false);
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
     final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
@@ -101,6 +102,7 @@ public class TestScannerWithBulkload {
       }
       result = scanner.next();
     }
+    scanner.close();
     table.close();
   }
 
@@ -121,7 +123,10 @@ public class TestScannerWithBulkload {
     return result;
   }
 
-  private Path writeToHFile(long l, String hFilePath, String pathStr) throws IOException {
+  // If nativeHFile is true, we will set cell seq id and MAX_SEQ_ID_KEY in the file.
+  // Else, we will set BULKLOAD_TIME_KEY.
+  private Path writeToHFile(long l, String hFilePath, String pathStr, boolean nativeHFile)
+      throws IOException {
     FileSystem fs = FileSystem.get(TEST_UTIL.getConfiguration());
     final Path hfilePath = new Path(hFilePath);
     fs.mkdirs(hfilePath);
@@ -132,10 +137,26 @@ public class TestScannerWithBulkload {
     HFile.Writer writer = wf.withPath(fs, path).withFileContext(context).create();
     KeyValue kv = new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l,
         Bytes.toBytes("version2"));
+
+    // Set cell seq id to test bulk load native hfiles.
+    if (nativeHFile) {
+      // Set a big seq id. Scan should not look at this seq id in a bulk loaded file.
+      // Scan should only look at the seq id appended at the bulk load time, and not skip
+      // this kv.
+      kv.setSequenceId(9999999);
+    }
+
     writer.append(kv);
-    // Add the bulk load time_key. otherwise we cannot ensure that it is a bulk
-    // loaded file
+
+    if (nativeHFile) {
+      // Set a big MAX_SEQ_ID_KEY. Scan should not look at this seq id in a bulk loaded file.
+      // Scan should only look at the seq id appended at the bulk load time, and not skip its
+      // kv.
+      writer.appendFileInfo(StoreFile.MAX_SEQ_ID_KEY, Bytes.toBytes(new Long(9999999)));
+    }
+    else {
     writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(System.currentTimeMillis()));
+    }
     writer.close();
     return hfilePath;
   }
@@ -182,7 +203,7 @@ public class TestScannerWithBulkload {
     final HTable table = init(admin, l, scan, tableName);
     // use bulkload
     final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadWithParallelScan/",
-        "/temp/testBulkLoadWithParallelScan/col/file");
+        "/temp/testBulkLoadWithParallelScan/col/file", false);
     Configuration conf = TEST_UTIL.getConfiguration();
     conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
     final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
@@ -209,10 +230,55 @@ public class TestScannerWithBulkload {
     // scanner
     Result result = scanner.next();
     scanAfterBulkLoad(scanner, result, "version1");
+    scanner.close();
     table.close();
 
   }
 
+  @Test
+  public void testBulkLoadNativeHFile() throws Exception {
+    String tableName = "testBulkLoadNativeHFile";
+    long l = System.currentTimeMillis();
+    HBaseAdmin admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
+    createTable(admin, tableName);
+    Scan scan = createScan();
+    final HTable table = init(admin, l, scan, tableName);
+    // use bulkload
+    final Path hfilePath = writeToHFile(l, "/temp/testBulkLoadNativeHFile/",
+      "/temp/testBulkLoadNativeHFile/col/file", true);
+    Configuration conf = TEST_UTIL.getConfiguration();
+    conf.setBoolean("hbase.mapreduce.bulkload.assign.sequenceNumbers", true);
+    final LoadIncrementalHFiles bulkload = new LoadIncrementalHFiles(conf);
+    bulkload.doBulkLoad(hfilePath, table);
+    ResultScanner scanner = table.getScanner(scan);
+    Result result = scanner.next();
+    // We had 'version0', 'version1' for 'row1,col:q' in the table.
+    // Bulk load added 'version2'  scanner should be able to see 'version2'
+    result = scanAfterBulkLoad(scanner, result, "version2");
+    Put put0 = new Put(Bytes.toBytes("row1"));
+    put0.add(new KeyValue(Bytes.toBytes("row1"), Bytes.toBytes("col"), Bytes.toBytes("q"), l, Bytes
+        .toBytes("version3")));
+    table.put(put0);
+    table.flushCommits();
+    admin.flush(tableName);
+    scanner = table.getScanner(scan);
+    result = scanner.next();
+    while (result != null) {
+      List<KeyValue> kvs = result.getColumn(Bytes.toBytes("col"), Bytes.toBytes("q"));
+      for (KeyValue _kv : kvs) {
+        if (Bytes.toString(_kv.getRow()).equals("row1")) {
+          System.out.println(Bytes.toString(_kv.getRow()));
+          System.out.println(Bytes.toString(_kv.getQualifier()));
+          System.out.println(Bytes.toString(_kv.getValue()));
+          Assert.assertEquals("version3", Bytes.toString(_kv.getValue()));
+        }
+      }
+      result = scanner.next();
+    }
+    scanner.close();
+    table.close();
+  }
+
   private Scan createScan() {
     Scan scan = new Scan();
     scan.setMaxVersions(3);