You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by li...@apache.org on 2012/11/16 01:02:45 UTC

svn commit: r1410118 - in /hbase/branches/0.89-fb/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/client/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/

Author: liyin
Date: Fri Nov 16 00:02:44 2012
New Revision: 1410118

URL: http://svn.apache.org/viewvc?rev=1410118&view=rev
Log:
[HBASE-2376] Introduce an API to perform FlashBackQueries.

Author: pritam

Summary:
Updated the relevant sections of scanning to make sure the
FlashBackQuery API is supported completely. I also changed the writables
for Get and Scan. I removed some version checking during write which I
felt was unnecessary (maybe I'm not clear on why we had that, but feel
    free to point something that I obviously missed out).

This is on top of D606550

Test Plan: Unit tests added.

Reviewers: kannan, kranganathan, liyintang, aaiyer, adela, cjin

Reviewed By: liyintang

CC: hbase-eng@, erling

Differential Revision: https://phabricator.fb.com/D618553

Task ID: 1733764

Added:
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/FlashBackQueryTestUtil.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQuery.java
Modified:
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
    hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQueryCompaction.java

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1410118&r1=1410117&r2=1410118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/KeyValue.java Fri Nov 16 00:02:44 2012
@@ -1677,18 +1677,29 @@ public class KeyValue implements Writabl
     return new KeyValue(row, family, qualifier, HConstants.LATEST_TIMESTAMP, Type.Maximum);
   }
 
+  public static KeyValue createFirstDeleteFamilyOnRow(final byte[] row,
+      final byte[] family) {
+    return createDeleteFamilyOnRow(row, family,
+        HConstants.LATEST_TIMESTAMP);
+  }
+
   /**
    * Create a Delete Family KeyValue for the specified row and family that would
    * be smaller than all other possible Delete Family KeyValues that have the
-   * same row and family.
-   * Used for seeking.
-   * @param row - row key (arbitrary byte array)
-   * @param family - family name
-   * @return First Delete Family possible key on passed <code>row</code>.
-   */
-  public static KeyValue createFirstDeleteFamilyOnRow(final byte [] row,
-      final byte [] family) {
-    return new KeyValue(row, family, null, HConstants.LATEST_TIMESTAMP,
+   * same row and family. Used for seeking.
+   *
+   * @param row
+   *          - row key (arbitrary byte array)
+   * @param family
+   *          - family name
+   * @param ts
+   *          - timestamp
+   * @return the Delete Family possible key on passed <code>row</code>
+   * and <code>ts</code>.
+   */
+  public static KeyValue createDeleteFamilyOnRow(final byte [] row,
+      final byte[] family, long ts) {
+    return new KeyValue(row, family, null, ts,
         Type.DeleteFamily);
   }
 

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java?rev=1410118&r1=1410117&r2=1410118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Get.java Fri Nov 16 00:02:44 2012
@@ -69,7 +69,10 @@ import java.util.TreeSet;
  */
 public class Get extends OperationWithAttributes
   implements Writable, Row, Comparable<Row> {
-  private static final byte GET_VERSION = (byte)3;
+  private static final byte STORE_LIMIT_VERSION = (byte) 2;
+  private static final byte STORE_OFFSET_VERSION = (byte) 3;
+  private static final byte FLASHBACK_VERSION = (byte) 4;
+  private static final byte GET_VERSION = FLASHBACK_VERSION;
 
   private byte [] row = null;
   private long lockId = -1L;
@@ -80,6 +83,8 @@ public class Get extends OperationWithAt
   private TimeRange tr = new TimeRange();
   private Map<byte [], NavigableSet<byte []>> familyMap =
     new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
+  // Operation should be performed as if it was performed at the given ts.
+  private long effectiveTS = HConstants.LATEST_TIMESTAMP;
 
   /** Constructor for Writable.  DO NOT USE */
   public Get() {}
@@ -183,6 +188,16 @@ public class Get extends OperationWithAt
   }
 
   /**
+   * Set the effective timestamp for this get.
+   *
+   * @return this for invocation chaining
+   */
+  public Get setEffectiveTS(long effectiveTS) {
+    this.effectiveTS = effectiveTS;
+    return this;
+  }
+
+  /**
    * Get up to the specified number of versions of each column.
    * @param maxVersions maximum versions for each column
    * @throws IOException if invalid number of versions
@@ -270,6 +285,13 @@ public class Get extends OperationWithAt
   }
 
   /**
+   * @return the effective timestamp of this operation.
+   */
+  public long getEffectiveTS() {
+    return this.effectiveTS;
+  }
+
+  /**
    * Method for retrieving the get's maximum number of values
    * to return per Column Family
    * @return the maximum number of values to fetch per CF
@@ -412,12 +434,15 @@ public class Get extends OperationWithAt
     this.row = Bytes.readByteArray(in);
     this.lockId = in.readLong();
     this.maxVersions = in.readInt();
-    if (version > 1) {
+    if (version >= STORE_LIMIT_VERSION) {
       this.storeLimit = in.readInt();
     }
-    if (version > 2) {
+    if (version >= STORE_OFFSET_VERSION) {
       this.storeOffset = in.readInt();
     }
+    if (version >= FLASHBACK_VERSION) {
+      effectiveTS = in.readLong();
+    }
     boolean hasFilter = in.readBoolean();
     if (hasFilter) {
       this.filter = (Filter)createForName(Bytes.toString(Bytes.readByteArray(in)));
@@ -446,22 +471,29 @@ public class Get extends OperationWithAt
 
   public void write(final DataOutput out)
   throws IOException {
-    byte version = (byte)1;
-    if (this.storeOffset != 0) {
-      version = GET_VERSION;
+    // We try to talk a protocol version as low as possible so that we can be
+    // backward compatible as far as possible.
+    byte version = (byte) 1;
+    if (effectiveTS != HConstants.LATEST_TIMESTAMP) {
+      version = FLASHBACK_VERSION;
+    } else if (this.storeOffset != 0) {
+      version = STORE_OFFSET_VERSION;
     } else if (this.storeLimit != -1) {
-      version = (byte)2;
+      version = STORE_LIMIT_VERSION;
     }
     out.writeByte(version);
     Bytes.writeByteArray(out, this.row);
     out.writeLong(this.lockId);
     out.writeInt(this.maxVersions);
-    if (version > 1) {
+    if (version >= STORE_LIMIT_VERSION) {
       out.writeInt(this.storeLimit);
     }
-    if (version > 2) {
+    if (version >= STORE_OFFSET_VERSION) {
       out.writeInt(this.storeOffset);
     }
+    if (version >= FLASHBACK_VERSION) {
+      out.writeLong(effectiveTS);
+    }
     if(this.filter == null) {
       out.writeBoolean(false);
     } else {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1410118&r1=1410117&r2=1410118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/Scan.java Fri Nov 16 00:02:44 2012
@@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.IncompatibleFilterException;
 import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableFactories;
@@ -83,11 +84,12 @@ import java.util.TreeSet;
  * execute {@link #setCacheBlocks(boolean)}.
  */
 public class Scan extends Operation implements Writable {
-  private static final int VERSION_STORE_LIMIT = 2;
-  private static final int VERSION_STORE_OFFSET = 3;
-  private static final int VERSION_RESPONSE_SIZE = 4;
-  private static final byte SCAN_VERSION = VERSION_RESPONSE_SIZE;
-  
+  private static final byte STORE_LIMIT_VERSION = (byte)2;
+  private static final byte STORE_OFFSET_VERSION = (byte)3;
+  private static final byte RESPONSE_SIZE_VERSION = (byte)4;
+  private static final byte FLASHBACK_VERSION = (byte) 5;
+  private static final byte SCAN_VERSION = FLASHBACK_VERSION;
+
   private byte [] startRow = HConstants.EMPTY_START_ROW;
   private byte [] stopRow  = HConstants.EMPTY_END_ROW;
   private int maxVersions = 1;
@@ -103,6 +105,7 @@ public class Scan extends Operation impl
   private TimeRange tr = new TimeRange();
   private Map<byte [], NavigableSet<byte []>> familyMap =
     new TreeMap<byte [], NavigableSet<byte []>>(Bytes.BYTES_COMPARATOR);
+  private long effectiveTS = HConstants.LATEST_TIMESTAMP;
 
   /**
    * Create a Scan operation across all rows.
@@ -180,6 +183,7 @@ public class Scan extends Operation impl
     this.storeOffset = get.getRowOffsetPerColumnFamily();
     this.tr = get.getTimeRange();
     this.familyMap = get.getFamilyMap();
+    this.effectiveTS = get.getEffectiveTS();
   }
 
   public boolean isGetScan() {
@@ -282,6 +286,16 @@ public class Scan extends Operation impl
   }
 
   /**
+   * Set the effective timestamp of this operation
+   *
+   * @return this
+   */
+  public Scan setEffectiveTS(long effectiveTS) {
+    this.effectiveTS = effectiveTS;
+    return this;
+  }
+
+  /**
    * Get all available versions.
    * @return this
    */
@@ -465,6 +479,13 @@ public class Scan extends Operation impl
   }
 
   /**
+   * @return the effective timestamp for this operation
+   */
+  public long getEffectiveTS() {
+    return this.effectiveTS;
+  }
+
+  /**
    * @return maximum number of values to return for a single call to next()
    */
   public int getBatch() {
@@ -641,16 +662,19 @@ public class Scan extends Operation impl
     this.stopRow = Bytes.readByteArray(in);
     this.maxVersions = in.readInt();
     this.batch = in.readInt();
-    if (version >= VERSION_STORE_LIMIT) {
+    if (version >= STORE_LIMIT_VERSION) {
       this.storeLimit = in.readInt();
     }
-    if (version >= VERSION_STORE_OFFSET) {
+    if (version >= STORE_OFFSET_VERSION) {
       this.storeOffset = in.readInt();
     }
-    if (version >= VERSION_RESPONSE_SIZE) {
+    if (version >= RESPONSE_SIZE_VERSION) {
       this.maxResponseSize = in.readInt();
       this.partialRow = in.readBoolean();
     }
+    if (version >= FLASHBACK_VERSION) {
+      effectiveTS = in.readLong();
+    }
     this.caching = in.readInt();
     this.cacheBlocks = in.readBoolean();
     if(in.readBoolean()) {
@@ -676,13 +700,18 @@ public class Scan extends Operation impl
 
   public void write(final DataOutput out)
   throws IOException {
-    byte version = (byte)1;
-    if (this.maxResponseSize != HConstants.DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE) {
-      version = (byte) VERSION_RESPONSE_SIZE;
+    // We try to talk a protocol version as low as possible so that we can be
+    // backward compatible as far as possible.
+    byte version = (byte) 1;
+    if (effectiveTS != HConstants.LATEST_TIMESTAMP) {
+      version = FLASHBACK_VERSION;
+    } else if (this.maxResponseSize
+        != HConstants.DEFAULT_HBASE_SCANNER_MAX_RESULT_SIZE) {
+      version = (byte) RESPONSE_SIZE_VERSION;
     } else if (this.storeOffset != 0) {
-      version = (byte)VERSION_STORE_OFFSET;
+      version = STORE_OFFSET_VERSION;
     } else if (this.storeLimit != -1) {
-      version = (byte)VERSION_STORE_LIMIT;
+      version = STORE_LIMIT_VERSION;
     }
 
     out.writeByte(version);
@@ -690,16 +719,19 @@ public class Scan extends Operation impl
     Bytes.writeByteArray(out, this.stopRow);
     out.writeInt(this.maxVersions);
     out.writeInt(this.batch);
-    if (version >= VERSION_STORE_LIMIT) {
+    if (version >= STORE_LIMIT_VERSION) {
       out.writeInt(this.storeLimit);
     }
-    if (version >= VERSION_STORE_OFFSET) {
+    if (version >= STORE_OFFSET_VERSION) {
       out.writeInt(this.storeOffset);
     }
-    if (version >= VERSION_RESPONSE_SIZE) {
+    if (version >= RESPONSE_SIZE_VERSION) {
       out.writeInt(this.maxResponseSize);
       out.writeBoolean(this.partialRow);
     }
+    if (version >= FLASHBACK_VERSION) {
+      out.writeLong(effectiveTS);
+    }
     out.writeInt(this.caching);
     out.writeBoolean(this.cacheBlocks);
     if(this.filter == null) {

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1410118&r1=1410117&r2=1410118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Fri Nov 16 00:02:44 2012
@@ -77,6 +77,8 @@ public class ScanQueryMatcher {
 
   private final long oldestFlashBackTS;
 
+  private final long effectiveTS;
+
   /**
    * Constructs a ScanQueryMatcher for a Scan.
    * @param scan
@@ -111,8 +113,9 @@ public class ScanQueryMatcher {
     this.rowComparator = rowComparator;
     this.deletes =  new ScanDeleteTracker();
     this.stopRow = scan.getStopRow();
-    this.startKey = KeyValue.createFirstDeleteFamilyOnRow(scan.getStartRow(),
-        family);
+    this.effectiveTS = scan.getEffectiveTS();
+    this.startKey = KeyValue.createDeleteFamilyOnRow(scan.getStartRow(),
+        family, effectiveTS);
     this.filter = scan.getFilter();
     this.retainDeletesInOutputUntil = retainDeletesInOutputUntil;
     this.maxReadPointToTrackVersions = readPointToUse;
@@ -171,6 +174,9 @@ public class ScanQueryMatcher {
    *      caused by a data corruption.
    */
   public MatchCode match(KeyValue kv) throws IOException {
+    if (kv.getTimestamp() > effectiveTS) {
+      return MatchCode.SEEK_TO_EFFECTIVE_TS;
+    }
     if (filter != null && filter.filterAllRemaining()) {
       return MatchCode.DONE_SCAN;
     }
@@ -389,6 +395,10 @@ public class ScanQueryMatcher {
         null, 0, 0);
   }
 
+  public KeyValue getKeyForEffectiveTSOnRow(KeyValue kv) {
+    return kv.createFirstOnRowColTS(effectiveTS);
+  }
+
   /**
    * {@link #match} return codes.  These instruct the scanner moving through
    * memstores and StoreFiles what to do with the current KeyValue.
@@ -440,6 +450,8 @@ public class ScanQueryMatcher {
      */
     SEEK_NEXT_USING_HINT,
 
+    SEEK_TO_EFFECTIVE_TS,
+
     /**
      * Include KeyValue and done with column, seek to next.
      */

Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1410118&r1=1410117&r2=1410118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Fri Nov 16 00:02:44 2012
@@ -93,7 +93,9 @@ public class StoreScanner extends NonLaz
     this.scan = scan;
     this.keyValueAggregator = keyValueAggregator;
     this.columns = columns;
-    oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl
+    long currentTime = (scan.getEffectiveTS() == HConstants.LATEST_TIMESTAMP) ? EnvironmentEdgeManager
+        .currentTimeMillis() : scan.getEffectiveTS();
+    oldestUnexpiredTS = currentTime - ttl
         - flashBackQueryLimit;
 
     // We look up row-column Bloom filters for multi-column queries as part of
@@ -398,6 +400,10 @@ public class StoreScanner extends NonLaz
         }
 
         switch(qcode) {
+
+          case SEEK_TO_EFFECTIVE_TS:
+            reseek(matcher.getKeyForEffectiveTSOnRow(kv));
+            break;
           case INCLUDE:
           case INCLUDE_AND_SEEK_NEXT_ROW:
           case INCLUDE_AND_SEEK_NEXT_COL:

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/FlashBackQueryTestUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/FlashBackQueryTestUtil.java?rev=1410118&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/FlashBackQueryTestUtil.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/FlashBackQueryTestUtil.java Fri Nov 16 00:02:44 2012
@@ -0,0 +1,197 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
+import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
+
+import org.junit.AfterClass;
+import static org.junit.Assert.*;
+import org.junit.BeforeClass;
+
+public class FlashBackQueryTestUtil {
+
+  private static final Log LOG = LogFactory
+      .getLog(FlashBackQueryTestUtil.class);
+  protected static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  protected static Random random = new Random();
+  protected static final int CURRENT_TIME = 60000;
+  protected static final int MAX_MAXVERSIONS = 100;
+  protected static final byte[][] families = new byte[][] { "a".getBytes(),
+      "b".getBytes(), "c".getBytes(), "d".getBytes(), "e".getBytes(),
+      "f".getBytes(), "g".getBytes(), "h".getBytes(), "i".getBytes(),
+      "j".getBytes() };
+
+  protected static class KVComparator implements Comparator<KeyValue> {
+
+    @Override
+    public int compare(KeyValue kv1, KeyValue kv2) {
+      return (kv1.getTimestamp() < kv2.getTimestamp() ? -1 : (kv1
+          .getTimestamp() > kv2.getTimestamp() ? 1 : 0));
+    }
+  }
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    ManualEnvironmentEdge mock = new ManualEnvironmentEdge();
+    mock.setValue(CURRENT_TIME);
+    EnvironmentEdgeManagerTestHelper.injectEdge(mock);
+    TEST_UTIL.startMiniCluster(1);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+    EnvironmentEdgeManagerTestHelper.reset();
+  }
+
+  protected HTable setupTable(int ttl, int flashBackQueryLimit, int maxVersions)
+      throws Exception {
+    LOG.info("maxVersions : " + maxVersions);
+    LOG.info("TTL : " + ttl);
+    LOG.info("FBQ: " + flashBackQueryLimit);
+    byte[] tableName = ("loadTable" + random.nextInt()).getBytes();
+    HColumnDescriptor[] hcds = new HColumnDescriptor[random
+        .nextInt(families.length - 1) + 1];
+    for (int i = 0; i < hcds.length; i++) {
+      hcds[i] = new HColumnDescriptor(families[i]);
+      hcds[i].setTimeToLive(ttl);
+      hcds[i].setFlashBackQueryLimit(flashBackQueryLimit);
+      hcds[i].setMaxVersions(maxVersions);
+    }
+    return TEST_UTIL.createTable(tableName, hcds);
+  }
+
+  protected void flushAllRegions() throws Exception {
+    HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
+    for (HRegion region : server.getOnlineRegions()) {
+      server.flushRegion(region.getRegionName());
+    }
+  }
+
+  protected KeyValue getRandomKv(byte[] row, HColumnDescriptor hcd, int start,
+      int size, boolean isDelete) {
+    byte[] value = new byte[10];
+    random.nextBytes(value);
+    long ts = start + random.nextInt(size);
+    if (isDelete) {
+      Type type = null;
+      if (random.nextBoolean()) {
+        type = Type.DeleteColumn;
+      } else {
+        type = Type.DeleteFamily;
+      }
+      return new KeyValue(row, hcd.getName(), null, ts, type,
+          value);
+    }
+    return new KeyValue(row, hcd.getName(), null, ts, value);
+  }
+
+  protected void majorCompact(byte[] tableName, HColumnDescriptor[] hcds)
+      throws Exception {
+    HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
+    for (HRegion region : server.getOnlineRegions()) {
+      if (!new String(region.getTableDesc().getName()).equals(new String(
+          tableName))) {
+        LOG.info("Skipping since name is : "
+            + new String(region.getTableDesc().getName()));
+        continue;
+      }
+      for (HColumnDescriptor hcd : hcds) {
+        Store store = region.getStore(hcd.getName());
+        store.compactRecentForTesting(-1);
+      }
+    }
+  }
+
+  protected void processHeapKvs(
+      HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs,
+      byte[] row, HColumnDescriptor hcd, KeyValue kv) {
+    HashMap<String, PriorityQueue<KeyValue>> rowMap = heapKvs.get(new String(
+        row));
+    if (rowMap == null) {
+      rowMap = new HashMap<String, PriorityQueue<KeyValue>>();
+      heapKvs.put(new String(row), rowMap);
+    }
+    PriorityQueue<KeyValue> q = rowMap.get(new String(hcd.getName()));
+    if (kv.isDelete()) {
+      if (q != null) {
+        // Timestamps appear in increasing order.
+        LOG.info("Clearing out at : " + kv);
+        q.clear();
+      }
+      return;
+    }
+    if (q == null) {
+      q = new PriorityQueue<KeyValue>(1, new KVComparator());
+      rowMap.put(new String(hcd.getName()), q);
+    }
+    q.add(kv);
+    LOG.info("Added kv : " + kv);
+    if (q.size() > hcd.getMaxVersions()) {
+      q.poll();
+    }
+
+  }
+
+  protected void setStoreProps(byte[] tableName, HColumnDescriptor[] hcds,
+      boolean def) {
+    HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
+    for (HRegion region : server.getOnlineRegions()) {
+      if (!new String(region.getTableDesc().getName()).equals(new String(
+          tableName))) {
+        LOG.info("Skipping since name is : "
+            + new String(region.getTableDesc().getName()));
+        continue;
+      }
+      for (HColumnDescriptor hcd : hcds) {
+        Store store = region.getStore(hcd.getName());
+        // Reset back to original values.
+        if (def) {
+          store.ttl = hcd.getTimeToLive() * 1000;
+          store.getFamily().setMaxVersions(hcd.getMaxVersions());
+        } else {
+          store.ttl = HConstants.FOREVER;
+          store.getFamily().setMaxVersions(Integer.MAX_VALUE);
+        }
+      }
+    }
+  }
+
+  protected boolean inHeapKvs(KeyValue kv,
+      HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs) {
+    HashMap<String, PriorityQueue<KeyValue>> rowMap = heapKvs.get(new String(kv
+        .getRow()));
+    if (rowMap == null)
+      return false;
+    return rowMap.get(new String(kv.getFamily())).contains(kv);
+  }
+
+  protected void verifyHeapKvs(
+      HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs,
+      HashSet<KeyValue> tableSet) {
+    for (HashMap<String, PriorityQueue<KeyValue>> rowMap : heapKvs.values()) {
+      for (PriorityQueue<KeyValue> q : rowMap.values()) {
+        for (KeyValue kv : q) {
+          assertTrue("KV in heapKvs: " + kv + " does not exist in table",
+              tableSet.contains(kv));
+        }
+      }
+    }
+  }
+
+}

Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQuery.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQuery.java?rev=1410118&view=auto
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQuery.java (added)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQuery.java Fri Nov 16 00:02:44 2012
@@ -0,0 +1,219 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.PriorityQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+public class TestFlashBackQuery extends FlashBackQueryTestUtil {
+  private static Log LOG = LogFactory.getLog(TestFlashBackQuery.class);
+  private static final HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs = new HashMap<String, HashMap<String, PriorityQueue<KeyValue>>>();
+
+  private HashMap<Long, Integer> getMap(long effectiveTS, HTable table)
+      throws Exception {
+    Scan scan = new Scan();
+    scan.setMaxVersions();
+    scan.setEffectiveTS(effectiveTS);
+    HashMap<Long, Integer> tableMap = new HashMap<Long, Integer>();
+    for (Result result : table.getScanner(scan)) {
+      for (KeyValue kv : result.list()) {
+        Integer val = tableMap.get(kv.getTimestamp());
+        int ival = (val == null) ? 0 : val;
+        tableMap.put(kv.getTimestamp(), ival + 1);
+        LOG.info("Got kv : " + kv + " effective : " + effectiveTS);
+      }
+    }
+    return tableMap;
+  }
+
+  private void verify(HTable table) throws Exception {
+    HashMap<Long, Integer> tableMap = getMap(HConstants.LATEST_TIMESTAMP, table);
+
+    assertEquals(10, (int) tableMap.get(50000L));
+    assertEquals(10, (int) tableMap.get(40000L));
+    verify1(table);
+  }
+
+  private void verify1(HTable table) throws Exception {
+    verify1(table, 0);
+  }
+
+  private void verify1(HTable table, int deleteVersions) throws Exception {
+    int expected3k = 10 - deleteVersions;
+    HashMap<Long, Integer> tableMap = getMap(35000, table);
+
+    assertEquals(expected3k, (int) tableMap.get(30000L));
+    assertEquals(10, (int) tableMap.get(20000L));
+  }
+
+  @Test
+  public void testTTL() throws Exception {
+    HTable table = setupTable(20, 30, 10);
+    byte[] row = new byte[10];
+    for (int i = 0; i < 10; i++) {
+      random.nextBytes(row);
+      Put put = new Put(row);
+      put.add(new KeyValue(row, families[0], families[0], 50000, row));
+      put.add(new KeyValue(row, families[0], families[0], 40000, row));
+      put.add(new KeyValue(row, families[0], families[0], 30000, row));
+      put.add(new KeyValue(row, families[0], families[0], 20000, row));
+      put.add(new KeyValue(row, families[0], families[0], 10000, row));
+      table.put(put);
+
+      Get get = new Get(row);
+      get.addColumn(families[0], families[0]);
+      get.setEffectiveTS(35000);
+      List<KeyValue> kvs = table.get(get).list();
+      assertEquals(1, kvs.size());
+      assertEquals(kvs.get(0), new KeyValue(row, families[0], families[0],
+          30000, row));
+    }
+
+    verify(table);
+    flushAllRegions();
+    verify(table);
+    majorCompact(table.getTableName(), table.getTableDescriptor()
+        .getColumnFamilies());
+    verify(table);
+  }
+
+  @Test
+  public void testMaxVersions() throws Exception {
+    HTable table = setupTable(60, 60, 2);
+    byte[] row = new byte[10];
+    for (int i = 0; i < 10; i++) {
+      random.nextBytes(row);
+      Put put = new Put(row);
+      put.add(new KeyValue(row, families[0], families[0], 50000, row));
+      put.add(new KeyValue(row, families[0], families[0], 40000, row));
+      put.add(new KeyValue(row, families[0], families[0], 30000, row));
+      put.add(new KeyValue(row, families[0], families[0], 20000, row));
+      put.add(new KeyValue(row, families[0], families[0], 10000, row));
+      table.put(put);
+
+      Get get = new Get(row);
+      get.addColumn(families[0], families[0]);
+      get.setEffectiveTS(35000);
+      List<KeyValue> kvs = table.get(get).list();
+      assertEquals(1, kvs.size());
+      assertEquals(kvs.get(0), new KeyValue(row, families[0], families[0],
+          30000, row));
+    }
+
+    verify(table);
+    flushAllRegions();
+    verify(table);
+    majorCompact(table.getTableName(), table.getTableDescriptor()
+        .getColumnFamilies());
+    verify(table);
+  }
+
+  @Test
+  public void testDeletes() throws Exception {
+    HTable table = setupTable(20, 30, 2);
+    byte[] row = new byte[10];
+    int deleteVersions = 0;
+    for (int i = 0; i < 10; i++) {
+      random.nextBytes(row);
+      Put put = new Put(row);
+      put.add(new KeyValue(row, families[0], null, 50000, row));
+      put.add(new KeyValue(row, families[0], null, 40000, row));
+      put.add(new KeyValue(row, families[0], null, 30000, row));
+      put.add(new KeyValue(row, families[0], null, 20000, row));
+      put.add(new KeyValue(row, families[0], null, 10000, row));
+      table.put(put);
+      Delete delete = new Delete(row);
+      int n = random.nextInt(4);
+      if (n == 0) {
+        delete.deleteFamily(families[0], 40000);
+      } else if (n == 1) {
+        delete.deleteRow(40000);
+      } else if (n == 2) {
+        delete.deleteColumns(families[0], null, 40000);
+      } else {
+        deleteVersions++;
+        delete.deleteColumn(families[0], null, 30000);
+      }
+      table.delete(delete);
+    }
+    verify1(table, deleteVersions);
+    flushAllRegions();
+    verify1(table, deleteVersions);
+    majorCompact(table.getTableName(), table.getTableDescriptor()
+        .getColumnFamilies());
+    verify1(table, deleteVersions);
+  }
+
+  private KeyValue processKV(byte[] row, HColumnDescriptor hcd, int start,
+      int size, long effectiveTS) {
+    boolean isDelete = random.nextBoolean();
+    KeyValue kv = getRandomKv(row, hcd, start, size, isDelete);
+    if (kv.getTimestamp() <= effectiveTS) {
+      processHeapKvs(heapKvs, row, hcd, kv);
+    }
+    return kv;
+  }
+
+  private void verifyRandom(long effectiveTS, HTable table) throws Exception {
+    Scan scan = new Scan();
+    scan.setMaxVersions(Integer.MAX_VALUE);
+    scan.setEffectiveTS(effectiveTS);
+    HashSet<KeyValue> tableSet = new HashSet<KeyValue>();
+    for (Result result : table.getScanner(scan)) {
+      for (KeyValue kv : result.list()) {
+        assertTrue("KV : " + kv + " should not exist", inHeapKvs(kv, heapKvs));
+        tableSet.add(kv);
+      }
+    }
+
+    verifyHeapKvs(heapKvs, tableSet);
+  }
+
+  @Test
+  public void testRandom() throws Exception {
+    int ttl = random.nextInt(CURRENT_TIME/1000);
+    int flashBackQueryLimit = random.nextInt(CURRENT_TIME/1000);
+    int maxVersions = random.nextInt(MAX_MAXVERSIONS);
+    long effectiveTS = CURRENT_TIME
+        - random.nextInt(flashBackQueryLimit * 1000 + 1);
+    LOG.info("PARAMS : " + ttl + " : " + flashBackQueryLimit + " : "
+        + maxVersions + " : " + effectiveTS);
+    HTable table = setupTable(ttl, flashBackQueryLimit, maxVersions);
+    HColumnDescriptor[] hcds = table.getTableDescriptor().getColumnFamilies();
+    byte[] row = new byte[10];
+    for (int i = 0; i < 10; i++) {
+      random.nextBytes(row);
+      Put put = new Put(row);
+      int size = CURRENT_TIME / MAX_MAXVERSIONS;
+      for (HColumnDescriptor hcd : hcds) {
+        for (int versions = 0, start = 0; versions < MAX_MAXVERSIONS; versions++, start += size) {
+          put.add(this.processKV(row, hcd, start, size, effectiveTS));
+        }
+      }
+      table.put(put);
+    }
+    verifyRandom(effectiveTS, table);
+    flushAllRegions();
+    verifyRandom(effectiveTS, table);
+    majorCompact(table.getTableName(), table.getTableDescriptor()
+        .getColumnFamilies());
+    verifyRandom(effectiveTS, table);
+  }
+}

Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQueryCompaction.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQueryCompaction.java?rev=1410118&r1=1410117&r2=1410118&view=diff
==============================================================================
--- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQueryCompaction.java (original)
+++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/regionserver/TestFlashBackQueryCompaction.java Fri Nov 16 00:02:44 2012
@@ -19,17 +19,15 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
-import java.util.Comparator;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.PriorityQueue;
-import java.util.Random;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.Type;
 import org.apache.hadoop.hbase.client.Delete;
@@ -37,87 +35,23 @@ import org.apache.hadoop.hbase.client.HT
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
 import org.apache.hadoop.hbase.util.InjectionEvent;
 import org.apache.hadoop.hbase.util.InjectionHandler;
-import org.apache.hadoop.hbase.util.ManualEnvironmentEdge;
 
-import org.junit.AfterClass;
 import static org.junit.Assert.*;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
-public class TestFlashBackQueryCompaction {
+public class TestFlashBackQueryCompaction extends FlashBackQueryTestUtil {
 
-  private static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
-  private static Random random = new Random();
-  private static final int CURRENT_TIME = 60000;
-  private static final int MAX_MAXVERSIONS = 100;
   private static final HashSet<KeyValue> goodKvs = new HashSet<KeyValue>();
   private static final HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs = new HashMap<String, HashMap<String, PriorityQueue<KeyValue>>>();
   private static final HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvsNormalScan = new HashMap<String, HashMap<String, PriorityQueue<KeyValue>>>();
   private static final Log LOG = LogFactory
       .getLog(TestFlashBackQueryCompaction.class);
-  private static final byte[][] families = new byte[][] { "a".getBytes(),
-      "b".getBytes(), "c".getBytes(), "d".getBytes(), "e".getBytes(),
-      "f".getBytes(), "g".getBytes(), "h".getBytes(), "i".getBytes(),
-      "j".getBytes() };
-
-  private static class KVComparator implements Comparator<KeyValue> {
-
-    @Override
-    public int compare(KeyValue kv1, KeyValue kv2) {
-      return (kv1.getTimestamp() < kv2.getTimestamp() ? -1 : (kv1
-          .getTimestamp() > kv2.getTimestamp() ? 1 : 0));
-    }
-  }
-
-  @BeforeClass
-  public static void setUpBeforeClass() throws Exception {
-
-    ManualEnvironmentEdge mock = new ManualEnvironmentEdge();
-    mock.setValue(CURRENT_TIME);
-    EnvironmentEdgeManagerTestHelper.injectEdge(mock);
-    TEST_UTIL.startMiniCluster(1);
-  }
-
-  @AfterClass
-  public static void tearDownAfterClass() throws Exception {
-    TEST_UTIL.shutdownMiniCluster();
-    EnvironmentEdgeManagerTestHelper.reset();
-  }
-
-  private void processHeapKvs(
-      HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs,
-      byte[] row, HColumnDescriptor hcd, KeyValue kv) {
-    HashMap<String, PriorityQueue<KeyValue>> rowMap = heapKvs.get(new String(
-        row));
-    if (rowMap == null) {
-      rowMap = new HashMap<String, PriorityQueue<KeyValue>>();
-      heapKvs.put(new String(row), rowMap);
-    }
-    PriorityQueue<KeyValue> q = rowMap.get(new String(hcd.getName()));
-    if (q == null) {
-      q = new PriorityQueue<KeyValue>(1, new KVComparator());
-      rowMap.put(new String(hcd.getName()), q);
-    }
-    q.add(kv);
-    if (q.size() > hcd.getMaxVersions()) {
-      q.poll();
-    }
-  }
-
-  private KeyValue getRandomKv(byte[] row, HColumnDescriptor hcd, int start,
-      int size) {
-    byte[] value = new byte[10];
-    random.nextBytes(value);
-    long ts = start + random.nextInt(size);
-    return new KeyValue(row, hcd.getName(), null, ts, value);
-  }
 
   private KeyValue processKV(byte[] row, HColumnDescriptor hcd, int start,
       int size) {
-    KeyValue kv = getRandomKv(row, hcd, start, size);
+    KeyValue kv = getRandomKv(row, hcd, start, size, false);
     if (kv.getTimestamp() >= CURRENT_TIME - hcd.getTimeToLive() * 1000) {
       processHeapKvs(heapKvsNormalScan, row, hcd, kv);
     }
@@ -133,30 +67,6 @@ public class TestFlashBackQueryCompactio
     return kv;
   }
 
-  private HTable setupTable(int ttl, int flashBackQueryLimit, int maxVersions)
-      throws Exception {
-    LOG.info("maxVersions : " + maxVersions);
-    LOG.info("TTL : " + ttl);
-    LOG.info("FBQ: " + flashBackQueryLimit);
-    byte[] tableName = ("loadTable" + random.nextInt()).getBytes();
-    HColumnDescriptor[] hcds = new HColumnDescriptor[random
-        .nextInt(families.length - 1) + 1];
-    for (int i = 0; i < hcds.length; i++) {
-      hcds[i] = new HColumnDescriptor(families[i]);
-      hcds[i].setTimeToLive(ttl);
-      hcds[i].setFlashBackQueryLimit(flashBackQueryLimit);
-      hcds[i].setMaxVersions(maxVersions);
-    }
-    return TEST_UTIL.createTable(tableName, hcds);
-  }
-
-  private void flushAllRegions() throws Exception {
-    HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
-    for (HRegion region : server.getOnlineRegions()) {
-      server.flushRegion(region.getRegionName());
-    }
-  }
-
   private void loadTable(int ttl, int flashBackQueryLimit, int maxVersions)
       throws Exception {
     HTable table = setupTable(ttl, flashBackQueryLimit, maxVersions);
@@ -219,69 +129,6 @@ public class TestFlashBackQueryCompactio
     verifyHeapKvs(heapKvsNormalScan, tableSet);
   }
 
-  private void majorCompact(byte[] tableName, HColumnDescriptor[] hcds)
-      throws Exception {
-    HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
-    for (HRegion region : server.getOnlineRegions()) {
-      if (!new String(region.getTableDesc().getName()).equals(new String(
-          tableName))) {
-        LOG.info("Skipping since name is : "
-            + new String(region.getTableDesc().getName()));
-        continue;
-      }
-      for (HColumnDescriptor hcd : hcds) {
-        Store store = region.getStore(hcd.getName());
-        store.compactRecentForTesting(-1);
-      }
-    }
-  }
-
-  private void setStoreProps(byte[] tableName, HColumnDescriptor[] hcds,
-      boolean def) {
-    HRegionServer server = TEST_UTIL.getHBaseCluster().getRegionServer(0);
-    for (HRegion region : server.getOnlineRegions()) {
-      if (!new String(region.getTableDesc().getName()).equals(new String(
-          tableName))) {
-        LOG.info("Skipping since name is : "
-            + new String(region.getTableDesc().getName()));
-        continue;
-      }
-      for (HColumnDescriptor hcd : hcds) {
-        Store store = region.getStore(hcd.getName());
-        // Reset back to original values.
-        if (def) {
-          store.ttl = hcd.getTimeToLive() * 1000;
-          store.getFamily().setMaxVersions(hcd.getMaxVersions());
-        } else {
-          store.ttl = HConstants.FOREVER;
-          store.getFamily().setMaxVersions(Integer.MAX_VALUE);
-        }
-      }
-    }
-  }
-
-  private void verifyHeapKvs(
-      HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs,
-      HashSet<KeyValue> tableSet) {
-    for (HashMap<String, PriorityQueue<KeyValue>> rowMap : heapKvs.values()) {
-      for (PriorityQueue<KeyValue> q : rowMap.values()) {
-        for (KeyValue kv : q) {
-          assertTrue("KV in heapKvs: " + kv + " does not exist in table",
-              tableSet.contains(kv));
-        }
-      }
-    }
-  }
-
-  private boolean inHeapKvs(KeyValue kv,
-      HashMap<String, HashMap<String, PriorityQueue<KeyValue>>> heapKvs) {
-    HashMap<String, PriorityQueue<KeyValue>> rowMap = heapKvs.get(new String(kv
-        .getRow()));
-    if (rowMap == null)
-      return false;
-    return rowMap.get(new String(kv.getFamily())).contains(kv);
-  }
-
   @Test
   public void testRandom() throws Exception {
     loadTable(random.nextInt(CURRENT_TIME) / 1000,