You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by an...@apache.org on 2015/06/10 14:35:40 UTC

hbase git commit: HBASE-13836 Do not reset the mvcc for bulk loaded mob reference cells in reading. (Jingcheng)

Repository: hbase
Updated Branches:
  refs/heads/hbase-11339 ba348cf5a -> 26893aa45


HBASE-13836 Do not reset the mvcc for bulk loaded mob reference cells in reading. (Jingcheng)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/26893aa4
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/26893aa4
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/26893aa4

Branch: refs/heads/hbase-11339
Commit: 26893aa451215ef0395b7df16f129414b7b86c86
Parents: ba348cf
Author: anoopsjohn <an...@gmail.com>
Authored: Wed Jun 10 18:05:10 2015 +0530
Committer: anoopsjohn <an...@gmail.com>
Committed: Wed Jun 10 18:05:10 2015 +0530

----------------------------------------------------------------------
 .../compactions/PartitionedMobCompactor.java    |  1 +
 .../hadoop/hbase/regionserver/StoreFile.java    | 34 ++++++++++
 .../hbase/regionserver/StoreFileScanner.java    |  2 +-
 .../hbase/mob/compactions/TestMobCompactor.java | 70 ++++++++++++++++++--
 4 files changed, 101 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/26893aa4/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
index 065787e..8cda746 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mob/compactions/PartitionedMobCompactor.java
@@ -588,6 +588,7 @@ public class PartitionedMobCompactor extends MobCompactor {
     if (writer != null) {
       writer.appendMetadata(maxSeqId, false);
       writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY, Bytes.toBytes(bulkloadTime));
+      writer.appendFileInfo(StoreFile.SKIP_RESET_SEQ_ID, Bytes.toBytes(true));
       try {
         writer.close();
       } catch (IOException e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/26893aa4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
index 7001acb..009ba0d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
@@ -165,6 +165,13 @@ public class StoreFile {
   private final BloomType cfBloomType;
 
   /**
+   * Key for skipping resetting sequence id in metadata.
+   * For bulk loaded hfiles, the scanner resets the cell seqId with the latest one,
+   * if this metadata is set as true, the reset is skipped.
+   */
+  public static final byte[] SKIP_RESET_SEQ_ID = Bytes.toBytes("SKIP_RESET_SEQ_ID");
+
+  /**
    * Constructor, loads a reader and it's indices, etc. May allocate a
    * substantial amount of ram depending on the underlying files (10-20MB?).
    *
@@ -407,6 +414,12 @@ public class StoreFile {
           this.sequenceid += 1;
         }
       }
+      // SKIP_RESET_SEQ_ID only works in bulk loaded file.
+      // In mob compaction, the hfile where the cells contain the path of a new mob file is bulk
+      // loaded to hbase, these cells have the same seqIds with the old ones. We do not want
+      // to reset new seqIds for them since this might make a mess of the visibility of cells that
+      // have the same row key but different seqIds.
+      this.reader.setSkipResetSeqId(isSkipResetSeqId(metadataMap.get(SKIP_RESET_SEQ_ID)));
       this.reader.setBulkLoaded(true);
     }
     this.reader.setSequenceID(this.sequenceid);
@@ -536,6 +549,18 @@ public class StoreFile {
     return sb.toString();
   }
 
+  /**
+   * Gets whether to skip resetting the sequence id for cells.
+   * @param skipResetSeqId The byte array of boolean.
+   * @return Whether to skip resetting the sequence id.
+   */
+  private boolean isSkipResetSeqId(byte[] skipResetSeqId) {
+    if (skipResetSeqId != null && skipResetSeqId.length == 1) {
+      return Bytes.toBoolean(skipResetSeqId);
+    }
+    return false;
+  }
+
   public static class WriterBuilder {
     private final Configuration conf;
     private final CacheConfig cacheConf;
@@ -1068,6 +1093,7 @@ public class StoreFile {
     private long deleteFamilyCnt = -1;
     private boolean bulkLoadResult = false;
     private KeyValue.KeyOnlyKeyValue lastBloomKeyOnlyKV = null;
+    private boolean skipResetSeqId = true;
 
     public Reader(FileSystem fs, Path path, CacheConfig cacheConf, Configuration conf)
         throws IOException {
@@ -1594,6 +1620,14 @@ public class StoreFile {
     public long getMaxTimestamp() {
       return timeRangeTracker == null ? Long.MAX_VALUE : timeRangeTracker.getMaximumTimestamp();
     }
+
+    boolean isSkipResetSeqId() {
+      return skipResetSeqId;
+    }
+
+    void setSkipResetSeqId(boolean skipResetSeqId) {
+      this.skipResetSeqId = skipResetSeqId;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/26893aa4/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
index 42a378d..1111a61 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
@@ -208,7 +208,7 @@ public class StoreFileScanner implements KeyValueScanner {
 
   protected void setCurrentCell(Cell newVal) throws IOException {
     this.cur = newVal;
-    if (this.cur != null && this.reader.isBulkLoaded()) {
+    if (this.cur != null && this.reader.isBulkLoaded() && !this.reader.isSkipResetSeqId()) {
       CellUtil.setSequenceId(cur, this.reader.getSequenceID());
     }
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26893aa4/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
index d63bb95..380ebac 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mob/compactions/TestMobCompactor.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.security.Key;
 import java.security.SecureRandom;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
@@ -54,6 +55,7 @@ import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
@@ -177,11 +179,11 @@ public class TestMobCompactor {
     TableName tableName = TableName.valueOf(tableNameAsString);
     HColumnDescriptor hcd1 = new HColumnDescriptor(family1);
     hcd1.setMobEnabled(true);
-    hcd1.setMobThreshold(0L);
+    hcd1.setMobThreshold(5);
     hcd1.setMaxVersions(4);
     HColumnDescriptor hcd2 = new HColumnDescriptor(family2);
     hcd2.setMobEnabled(true);
-    hcd2.setMobThreshold(0L);
+    hcd2.setMobThreshold(5);
     hcd2.setMaxVersions(4);
     HTableDescriptor desc = new HTableDescriptor(tableName);
     desc.addFamily(hcd1);
@@ -570,7 +572,7 @@ public class TestMobCompactor {
     // do the mob compaction
     admin.compactMob(tableName, hcd1.getName());
 
-    waitUntilCompactionFinished(tableName);
+    waitUntilMobCompactionFinished(tableName);
     assertEquals("After compaction: mob rows count", regionNum * (rowNumPerRegion - delRowNum),
       countMobRows(hTable));
     assertEquals("After compaction: mob cells count", regionNum
@@ -618,7 +620,7 @@ public class TestMobCompactor {
     // do the major mob compaction, it will force all files to compaction
     admin.majorCompactMob(tableName, hcd1.getName());
 
-    waitUntilCompactionFinished(tableName);
+    waitUntilMobCompactionFinished(tableName);
     assertEquals("After compaction: mob rows count", regionNum*(rowNumPerRegion-delRowNum),
         countMobRows(hTable));
     assertEquals("After compaction: mob cells count",
@@ -633,7 +635,58 @@ public class TestMobCompactor {
         countFiles(tableName, false, family2));
   }
 
-  private void waitUntilCompactionFinished(TableName tableName) throws IOException,
+  @Test
+  public void testScannerOnBulkLoadRefHFiles() throws Exception {
+    long ts = EnvironmentEdgeManager.currentTime();
+    byte[] key0 = Bytes.toBytes("k0");
+    byte[] key1 = Bytes.toBytes("k1");
+    String value0 = "mobValue0";
+    String value1 = "mobValue1";
+    String newValue0 = "new";
+    Put put0 = new Put(key0);
+    put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value0));
+    loadData(admin, bufMut, tableName, new Put[] { put0 });
+    put0 = new Put(key0);
+    put0.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(newValue0));
+    Put put1 = new Put(key1);
+    put1.addColumn(Bytes.toBytes(family1), Bytes.toBytes(qf1), ts, Bytes.toBytes(value1));
+    loadData(admin, bufMut, tableName, new Put[] { put0, put1 });
+    // read the latest cell of key0.
+    Get get = new Get(key0);
+    Result result = hTable.get(get);
+    Cell cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
+    assertEquals("Before compaction: mob value of k0", "new",
+      Bytes.toString(CellUtil.cloneValue(cell)));
+    admin.majorCompactMob(tableName, hcd1.getName());
+    waitUntilMobCompactionFinished(tableName);
+    // read the latest cell of key0, the cell seqId in bulk loaded file is not reset in the
+    // scanner. The cell that has "new" value is still visible.
+    result = hTable.get(get);
+    cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
+    assertEquals("After compaction: mob value of k0", "new",
+      Bytes.toString(CellUtil.cloneValue(cell)));
+    // read the ref cell, not read further to the mob cell.
+    get.setAttribute(MobConstants.MOB_SCAN_RAW, Bytes.toBytes(true));
+    result = hTable.get(get);
+    cell = result.getColumnLatestCell(hcd1.getName(), Bytes.toBytes(qf1));
+    // the ref name is the new file
+    Path mobFamilyPath = new Path(MobUtils.getMobRegionPath(TEST_UTIL.getConfiguration(),
+      tableName), hcd1.getNameAsString());
+    List<Path> paths = new ArrayList<Path>();
+    if (fs.exists(mobFamilyPath)) {
+      FileStatus[] files = fs.listStatus(mobFamilyPath);
+      for (FileStatus file : files) {
+        if (!StoreFileInfo.isDelFile(file.getPath())) {
+          paths.add(file.getPath());
+        }
+      }
+    }
+    assertEquals("After compaction: number of mob files:", 1, paths.size());
+    assertEquals("After compaction: mob file name:", MobUtils.getMobFileName(cell), paths.get(0)
+      .getName());
+  }
+
+  private void waitUntilMobCompactionFinished(TableName tableName) throws IOException,
     InterruptedException {
     long finished = EnvironmentEdgeManager.currentTime() + 60000;
     CompactionState state = admin.getMobCompactionState(tableName);
@@ -804,6 +857,13 @@ public class TestMobCompactor {
     }
   }
 
+  private void loadData(Admin admin, BufferedMutator table, TableName tableName, Put[] puts)
+    throws IOException {
+    table.mutate(Arrays.asList(puts));
+    table.flush();
+    admin.flush(tableName);
+  }
+
   /**
    * delete the row, family and cell to create the del file
    */