You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2014/02/21 20:18:26 UTC

svn commit: r1570674 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java

Author: liyin
Date: Fri Feb 21 19:18:26 2014
New Revision: 1570674

URL: http://svn.apache.org/r1570674
Log:
[HBASE-10578] [0.89-fb] Choose the correct KV from multiple scanners

Author: aaiyer

Summary:
When multiple scanners have the same KV, HBase should pick the "newest" one.
However, if one of the "current" scanner has the same KV, it seems to get
preference over the other scanners in the heap -- THIS IS WRONG.

This issue has been causing Relja problems.  The diff adds a unit test to make
sure that bulk loads correctly. And fixes the issue.

Test Plan: Added a unit test

Reviewers: gauravm, liyintang, rshroff

Reviewed By: rshroff

CC: hbase-dev@, rpetrovic, daviddeng

Differential Revision: https://phabricator.fb.com/D1183013

Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=1570674&r1=1570673&r2=1570674&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Fri Feb 21 19:18:26 2014
@@ -102,7 +102,7 @@ public class KeyValueHeap extends NonLaz
     } else {
       KeyValueScanner topScanner = this.heap.peek();
       if (topScanner == null ||
-          this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
+          this.comparator.compare(this.current, topScanner) > 0) {
         this.heap.add(this.current);
         this.current = pollRealKV();
       }
@@ -381,7 +381,7 @@ public class KeyValueHeap extends NonLaz
           // Compare the current scanner to the next scanner. We try to avoid
           // putting the current one back into the heap if possible.
           KeyValue nextKV = nextEarliestScanner.peek();
-          if (nextKV == null || comparator.compare(curKV, nextKV) <= 0) {
+          if (nextKV == null || comparator.compare(kvScanner, nextEarliestScanner) <= 0) {
             // We already have the scanner with the earliest KV, so return it.
             return kvScanner;
           }

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java?rev=1570674&r1=1570673&r2=1570674&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/mapreduce/TestLoadIncrementalHFiles.java Fri Feb 21 19:18:26 2014
@@ -24,18 +24,20 @@ import static org.junit.Assert.assertTru
 
 import java.io.IOException;
 import java.util.List;
+import java.util.NavigableMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.HBaseAdmin;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
@@ -43,8 +45,11 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.regionserver.TestStore;
 import org.apache.hadoop.hbase.regionserver.TestStoreFile;
+import org.apache.hadoop.hbase.regionserver.TimeRangeTracker;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.WritableUtils;
 import org.junit.Test;
+import org.mortbay.log.Log;
 
 /**
  * Test cases for the "load" half of the HFileOutputFormat bulk load
@@ -154,6 +159,103 @@ public class TestLoadIncrementalHFiles {
     }
   }
 
+  @Test
+  public void testLoadWithSeqNumNoFlush() throws Exception {
+    testLoadWithSeqNum(false);
+  }
+
+  @Test
+  public void testLoadWithSeqNumWithFlush() throws Exception {
+    testLoadWithSeqNum(true);
+  }
+
+  private void testLoadWithSeqNum(boolean flush) throws Exception {
+    String testName = "bulkLoadSensibility";
+    Path dir1 = util.getTestDir(testName + "1");
+    Path dir2 = util.getTestDir(testName + "2");
+    FileSystem fs = util.getTestFileSystem();
+    dir1 = dir1.makeQualified(fs);
+    dir2 = dir2.makeQualified(fs);
+    Path familyDir1 = new Path(dir1, Bytes.toString(FAMILY));
+    Path familyDir2 = new Path(dir2, Bytes.toString(FAMILY));
+    byte[] row = Bytes.toBytes("key");
+    long timestamp = 1000;
+
+    int hfileIdx = 0;
+    Configuration config = util.getConfiguration();
+    Log.info("Creating first bulkloaded file");
+    createHFileForKV(config, fs, new Path(familyDir1, "hfile_"
+        + hfileIdx++), row, FAMILY, QUALIFIER, timestamp, Bytes.toBytes("bulkValue"));
+
+    util.startMiniCluster();
+    try {
+      HBaseAdmin admin = new HBaseAdmin(config);
+      HTableDescriptor htd = new HTableDescriptor(TABLE);
+      htd.addFamily(new HColumnDescriptor(FAMILY));
+      // Do not worry about splitting the keys
+      admin.createTable(htd);
+
+      HTable table = new HTable(config, TABLE);
+      Log.info("Waiting for HTable to be available");
+      util.waitTableAvailable(TABLE, 30000);
+
+      // Do a dummy put to increase the hlog sequence number
+      Log.info("Doing a htable.put()");
+      Put put = new Put(row);
+      put.add(FAMILY, QUALIFIER, timestamp, Bytes.toBytes("singlePutValue"));
+      table.put(put);
+
+      if (flush) {
+        // flush the table
+        Log.info("Flushing the table ");
+        table.flushRegionForRow(row, 0);
+      }
+
+      config.setBoolean(LoadIncrementalHFiles.ASSIGN_SEQ_IDS, false);
+      Log.info("Loading the first bulkload file");
+      LoadIncrementalHFiles loader1 = new LoadIncrementalHFiles(
+          config);
+      loader1.doBulkLoad(dir1, table);
+
+      Log.info("Getting after the first bulkloaded file");
+      Get get = new Get(row);
+      get.addColumn(FAMILY, QUALIFIER);
+      get.setTimeStamp(timestamp);
+
+      Result result = table.get(get);
+      NavigableMap<Long, byte[]> navigableMap =
+          result.getMap().get(FAMILY).get(QUALIFIER);
+      assertEquals("singlePutValue", Bytes.toString(navigableMap.get(timestamp)));
+
+      Log.info("Creating second bulkload file");
+      createHFileForKV(config, fs, new Path(familyDir2, "hfile_"
+          + hfileIdx++), row, FAMILY, QUALIFIER, timestamp, Bytes.toBytes("bulkValue2"));
+
+      config.setBoolean(LoadIncrementalHFiles.ASSIGN_SEQ_IDS, true);
+      LoadIncrementalHFiles loader2 = new LoadIncrementalHFiles(
+          config);
+
+      Log.info("Loading the second bulkload file");
+      loader2.doBulkLoad(dir2, table);
+
+      Log.info("Verifying get after 2nd bulk load");
+      result = table.get(get);
+      navigableMap =
+          result.getMap().get(FAMILY).get(QUALIFIER);
+      if (flush) {
+        // no value in memstore
+        // the latest bulk load file should win
+        assertEquals("bulkValue2", Bytes.toString(navigableMap.get(timestamp)));
+      } else {
+        // The value in memstore wins
+        assertEquals("singlePutValue", Bytes.toString(navigableMap.get(timestamp)));
+      }
+    } finally {
+      util.shutdownMiniCluster();
+    }
+
+  }
+
   private void verifyAssignedSequenceNumber(String testName,
       byte[][][] hfileRanges, boolean nonZero) throws Exception {
     Path dir = util.getTestDir(testName);
@@ -262,16 +364,47 @@ public class TestLoadIncrementalHFiles {
         .withCompression(COMPRESSION)
         .withComparator(KeyValue.KEY_COMPARATOR)
         .create();
+    TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
     long now = System.currentTimeMillis();
     try {
       // subtract 2 since iterateOnSplits doesn't include boundary keys
       for (byte[] key : Bytes.iterateOnSplits(startKey, endKey, numRows-2)) {
         KeyValue kv = new KeyValue(key, family, qualifier, now, key);
         writer.append(kv);
+        timeRangeTracker.includeTimestamp(kv);
       }
     } finally {
       writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
           Bytes.toBytes(System.currentTimeMillis()));
+      writer.appendFileInfo(StoreFile.TIMERANGE_KEY,
+          WritableUtils.toByteArray(timeRangeTracker));
+      writer.close();
+    }
+  }
+
+  static void createHFileForKV(
+      Configuration conf,
+      FileSystem fs, Path path,
+      byte[] key,
+      byte[] family, byte[] qualifier,
+      long timestamp, byte[] value) throws IOException
+  {
+    HFile.Writer writer = HFile.getWriterFactory(conf, new CacheConfig(conf))
+        .withPath(fs, path)
+        .withBlockSize(BLOCKSIZE)
+        .withCompression(COMPRESSION)
+        .withComparator(KeyValue.KEY_COMPARATOR)
+        .create();
+    TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
+    try {
+      KeyValue kv = new KeyValue(key, family, qualifier, timestamp, value);
+      writer.append(kv);
+      timeRangeTracker.includeTimestamp(kv);
+    } finally {
+      writer.appendFileInfo(StoreFile.BULKLOAD_TIME_KEY,
+          Bytes.toBytes(System.currentTimeMillis()));
+      writer.appendFileInfo(StoreFile.TIMERANGE_KEY,
+          WritableUtils.toByteArray(timeRangeTracker));
       writer.close();
     }
   }