You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 19:45:30 UTC

svn commit: r1181981 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/ test/java/org/apache/hadoop/hbase/regionserver/

Author: nspiegelberg
Date: Tue Oct 11 17:45:30 2011
New Revision: 1181981

URL: http://svn.apache.org/viewvc?rev=1181981&view=rev
Log:
A new unit test for lazy seek and StoreScanner in general

Summary:
A randomized unit test for Gets/Scans (all-row, single-row, multi-row,
all-column, single-column, and multi-column). Also all combinations of Bloom
filters and compression (NONE vs GZIP) are tested. The unit test flushes
multiple StoreFiles with disjoint timestamp ranges and runs various types of
queries against them. Currently we are not testing overlapping timestamp ranges.

This diff was originally part of D326359.

Test Plan: Run the unit test, as well as TestMultiColumnScanner and
TestScanWithBloomError, which share some code.

Reviewers: liyintang, kannan

Reviewed By: kannan

CC: hbase-eng@lists, kannan

Differential Revision: 328766

Revert Plan: OK

Added:
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=1181981&r1=1181980&r2=1181981&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Tue Oct 11 17:45:30 2011
@@ -32,6 +32,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.SortedSet;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * KeyValueScanner adaptor over the Reader.  It also provides hooks into
@@ -45,6 +46,8 @@ class StoreFileScanner implements KeyVal
   private final HFileScanner hfs;
   private KeyValue cur = null;
 
+  private static final AtomicLong seekCount = new AtomicLong();
+
   /**
    * Implements a {@link KeyValueScanner} on top of the specified {@link HFileScanner}
    * @param hfs HFile scanner
@@ -109,6 +112,7 @@ class StoreFileScanner implements KeyVal
   }
 
   public boolean seek(KeyValue key) throws IOException {
+    seekCount.incrementAndGet();
     try {
       if(!seekAtOrAfter(hfs, key)) {
         close();
@@ -122,6 +126,7 @@ class StoreFileScanner implements KeyVal
   }
 
   public boolean reseek(KeyValue key) throws IOException {
+    seekCount.incrementAndGet();
     try {
       if (!reseekAtOrAfter(hfs, key)) {
         close();
@@ -211,4 +216,11 @@ class StoreFileScanner implements KeyVal
   Reader getReaderForTesting() {
     return reader;
   }
+
+  // Test methods
+
+  static final long getSeekCount() {
+    return seekCount.get();
+  }
+
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1181981&r1=1181980&r2=1181981&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Tue Oct 11 17:45:30 2011
@@ -50,6 +50,13 @@ class StoreScanner implements KeyValueSc
   private boolean closing = false;
   private final boolean isGet;
 
+  /** We don't ever expect to change this, the constant is just for clarity. */
+  static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
+
+  /** Used during unit testing to ensure that lazy seek does save seek ops */
+  private static boolean lazySeekEnabledGlobally =
+      LAZY_SEEK_ENABLED_BY_DEFAULT;
+
   // if heap == null and lastTop != null, you need to reseek given the key below
   private KeyValue lastTop = null;
 
@@ -462,4 +469,9 @@ class StoreScanner implements KeyValueSc
       allScanners.add(scanner);
     return allScanners;
   }
+
+  static void enableLazySeekGlobally(boolean enable) {
+    lazySeekEnabledGlobally = enable;
+  }
+
 }

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java?rev=1181981&r1=1181980&r2=1181981&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/HBaseTestingUtility.java Tue Oct 11 17:45:30 2011
@@ -28,6 +28,8 @@ import java.lang.reflect.Field;
 import java.security.MessageDigest;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -55,6 +57,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.InternalScanner;
 import org.apache.hadoop.hbase.regionserver.ReadWriteConsistencyControl;
 import org.apache.hadoop.hbase.regionserver.Store;
+import org.apache.hadoop.hbase.regionserver.StoreFile;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.Threads;
@@ -114,6 +117,24 @@ public class HBaseTestingUtility {
         Compression.Algorithm.NONE, Compression.Algorithm.GZ
       };
 
+  /**
+   * Create all combinations of Bloom filters and compression algorithms for
+   * testing.
+   */
+  private static List<Object[]> bloomAndCompressionCombinations() {
+    List<Object[]> configurations = new ArrayList<Object[]>();
+    for (Compression.Algorithm comprAlgo :
+         HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
+      for (StoreFile.BloomType bloomType : StoreFile.BloomType.values()) {
+        configurations.add(new Object[] { comprAlgo, bloomType });
+      }
+    }
+    return Collections.unmodifiableList(configurations);
+  }
+
+  public static final Collection<Object[]> BLOOM_AND_COMPRESSION_COMBINATIONS =
+      bloomAndCompressionCombinations();
+
   public HBaseTestingUtility() {
     this(HBaseConfiguration.create());
   }
@@ -1038,4 +1059,40 @@ public class HBaseTestingUtility {
 
     return getFromStoreFile(store,get);
   }
+
+  public static void assertKVListsEqual(String additionalMsg,
+      final List<KeyValue> expected,
+      final List<KeyValue> actual) {
+    final int eLen = expected.size();
+    final int aLen = actual.size();
+    final int minLen = Math.min(eLen, aLen);
+
+    int i;
+    for (i = 0; i < minLen
+        && KeyValue.COMPARATOR.compare(expected.get(i), actual.get(i)) == 0;
+        ++i) {}
+
+    if (additionalMsg == null) {
+      additionalMsg = "";
+    }
+    if (!additionalMsg.isEmpty()) {
+      additionalMsg = ". " + additionalMsg;
+    }
+
+    if (eLen != aLen || i != minLen) {
+      throw new AssertionError(
+          "Expected and actual KV arrays differ at position " + i + ": " +
+          safeGetAsStr(expected, i) + " (length " + eLen +") vs. " +
+          safeGetAsStr(actual, i) + " (length " + aLen + ")" + additionalMsg);
+    }
+  }
+
+  private static <T> String safeGetAsStr(List<T> lst, int i) {
+    if (0 <= i && i < lst.size()) {
+      return lst.get(i).toString();
+    } else {
+      return "<out_of_range>";
+    }
+  }
+
 }

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java?rev=1181981&r1=1181980&r2=1181981&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java Tue Oct 11 17:45:30 2011
@@ -65,11 +65,11 @@ public class TestMultiColumnScanner {
       LogFactory.getLog(TestMultiColumnScanner.class);
 
   private static final String TABLE_NAME = "TestMultiColumnScanner";
-  private static final int MAX_VERSIONS = 50;
 
-  // These fields are used in TestScanWithBloomError
+  // These fields are used in other unit tests
   static final String FAMILY = "CF";
   static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
+  static final int MAX_VERSIONS = 50;
 
   private final ColumnFamilyMetrics cfMetrics =
       ColumnFamilyMetrics.getInstance(FAMILY);
@@ -126,14 +126,7 @@ public class TestMultiColumnScanner {
 
   @Parameters
   public static final Collection<Object[]> parameters() {
-    List<Object[]> configurations = new ArrayList<Object[]>();
-    for (Compression.Algorithm comprAlgo :
-         HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
-      for (StoreFile.BloomType bloomType : StoreFile.BloomType.values()) {
-        configurations.add(new Object[] { comprAlgo, bloomType });
-      }
-    }
-    return configurations;
+    return HBaseTestingUtility.BLOOM_AND_COMPRESSION_COMBINATIONS;
   }
 
   public TestMultiColumnScanner(Compression.Algorithm comprAlgo,
@@ -175,7 +168,8 @@ public class TestMultiColumnScanner {
 
   @Test
   public void testMultiColumnScanner() throws IOException {
-    HRegion region = createRegion(TABLE_NAME, comprAlgo, bloomType);
+    HRegion region = createRegion(TABLE_NAME, comprAlgo, bloomType,
+        MAX_VERSIONS);
     List<String> rows = sequentialStrings("row", NUM_ROWS);
     List<String> qualifiers = sequentialStrings("qual", NUM_COLUMNS);
     List<KeyValue> kvs = new ArrayList<KeyValue>();
@@ -320,10 +314,10 @@ public class TestMultiColumnScanner {
   }
 
   static HRegion createRegion(String tableName,
-      Compression.Algorithm comprAlgo, BloomType bloomType)
+      Compression.Algorithm comprAlgo, BloomType bloomType, int maxVersions)
       throws IOException {
     HColumnDescriptor hcd =
-      new HColumnDescriptor(FAMILY_BYTES, MAX_VERSIONS,
+      new HColumnDescriptor(FAMILY_BYTES, maxVersions,
           comprAlgo.getName(),
           HColumnDescriptor.DEFAULT_IN_MEMORY,
           HColumnDescriptor.DEFAULT_BLOCKCACHE,

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java?rev=1181981&r1=1181980&r2=1181981&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java Tue Oct 11 17:45:30 2011
@@ -76,7 +76,8 @@ public class TestScanWithBloomError {
 
   @Test
   public void testThreeStoreFiles() throws IOException {
-    region = createRegion(TABLE_NAME, Compression.Algorithm.GZ, bloomType);
+    region = createRegion(TABLE_NAME, Compression.Algorithm.GZ, bloomType,
+        MAX_VERSIONS);
     createStoreFile(new int[] {1, 2, 6});
     createStoreFile(new int[] {1, 2, 3, 7});
     createStoreFile(new int[] {1, 9});

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java?rev=1181981&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestSeekOptimizations.java Tue Oct 11 17:45:30 2011
@@ -0,0 +1,435 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.apache.hadoop.hbase.HBaseTestingUtility.assertKVListsEqual;
+import static org.apache.hadoop.hbase.regionserver.TestMultiColumnScanner.*;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test various seek optimizations for correctness and check if they are
+ * actually saving I/O operations.
+ */
+@RunWith(Parameterized.class)
+public class TestSeekOptimizations {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestSeekOptimizations.class);
+
+  // Constants
+  private static final int PUTS_PER_ROW_COL = 50;
+  private static final int DELETES_PER_ROW_COL = 10;
+
+  private static final int NUM_ROWS = 3;
+  private static final int NUM_COLS = 3;
+
+  private static final boolean VERBOSE = false;
+
+  /**
+   * Disable this when this test fails hopelessly and you need to debug a
+   * simpler case.
+   */
+  private static final boolean USE_MANY_STORE_FILES = true;
+
+  private static final int[][] COLUMN_SETS = new int[][] {
+    {},  // All columns
+    {0},
+    {1},
+    {0, 2},
+    {1, 2},
+    {0, 1, 2},
+  };
+
+  // Both start row and end row are inclusive here for the purposes of this
+  // test.
+  private static final int[][] ROW_RANGES = new int[][] {
+    {-1, -1},
+    {0, 1},
+    {1, 1},
+    {1, 2},
+    {0, 2}
+  };
+
+  private static final int[] MAX_VERSIONS_VALUES = new int[] { 1, 2 };
+
+  // Instance variables
+  private HRegion region;
+  private Put put;
+  private Delete del;
+  private Random rand;
+  private Set<Long> putTimestamps = new HashSet<Long>();
+  private Set<Long> delTimestamps = new HashSet<Long>();
+  private List<KeyValue> expectedKVs = new ArrayList<KeyValue>();
+
+  private Compression.Algorithm comprAlgo;
+  private StoreFile.BloomType bloomType;
+
+  private long totalSeekDiligent, totalSeekLazy;
+
+  @Parameters
+  public static final Collection<Object[]> parameters() {
+    return HBaseTestingUtility.BLOOM_AND_COMPRESSION_COMBINATIONS;
+  }
+
+  public TestSeekOptimizations(Compression.Algorithm comprAlgo,
+      StoreFile.BloomType bloomType) {
+    this.comprAlgo = comprAlgo;
+    this.bloomType = bloomType;
+  }
+
+  @Before
+  public void setUp() {
+    rand = new Random(91238123L);
+    expectedKVs.clear();
+  }
+
+  @Test
+  public void testMultipleTimestampRanges() throws IOException {
+    region = TestMultiColumnScanner.createRegion(
+        TestSeekOptimizations.class.getName(), comprAlgo, bloomType,
+        Integer.MAX_VALUE);
+
+    // Delete the given timestamp and everything before.
+    final long latestDelTS = USE_MANY_STORE_FILES ? 1397 : -1;
+
+    createTimestampRange(1, 50, -1);
+    createTimestampRange(51, 100, -1);
+    if (USE_MANY_STORE_FILES) {
+      createTimestampRange(100, 500, 127);
+      createTimestampRange(900, 1300, -1);
+      createTimestampRange(1301, 2500, latestDelTS);
+      createTimestampRange(2502, 2598, -1);
+      createTimestampRange(2599, 2999, -1);
+    }
+
+    prepareExpectedKVs(latestDelTS);
+
+    for (int[] columnArr : COLUMN_SETS) {
+      for (int[] rowRange : ROW_RANGES) {
+        for (int maxVersions : MAX_VERSIONS_VALUES) {
+          for (boolean lazySeekEnabled : new boolean[] { false, true }) {
+            testScan(columnArr, lazySeekEnabled, rowRange[0], rowRange[1],
+                maxVersions);
+          }
+        }
+      }
+    }
+
+    final double seekSavings = 1 - totalSeekLazy * 1.0 / totalSeekDiligent;
+    System.err.println("For bloom=" + bloomType + ", compr=" + comprAlgo +
+        " total seeks without optimization: " + totalSeekDiligent
+        + ", with optimization: " + totalSeekLazy + " (" +
+        String.format("%.2f%%", totalSeekLazy * 100.0 / totalSeekDiligent) +
+        "), savings: " + String.format("%.2f%%",
+            100.0 * seekSavings) + "\n");
+
+    // Test that lazy seeks are buying us something.
+    final double expectedSeekSavings = 0.25;
+    assertTrue("Lazy seek is only saving " +
+        String.format("%.2f%%", seekSavings * 100) + " seeks but should " +
+        "save at least " + String.format("%.2f%%", expectedSeekSavings * 100),
+        seekSavings >= expectedSeekSavings);
+  }
+
+  private void testScan(final int[] columnArr, final boolean lazySeekEnabled,
+      final int startRow, final int endRow, int maxVersions)
+      throws IOException {
+    StoreScanner.enableLazySeekGlobally(lazySeekEnabled);
+    final Scan scan = new Scan();
+    final Set<String> qualSet = new HashSet<String>();
+    for (int iColumn : columnArr) {
+      String qualStr = getQualStr(iColumn);
+      scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qualStr));
+      qualSet.add(qualStr);
+    }
+    scan.setMaxVersions(maxVersions);
+    scan.setStartRow(rowBytes(startRow));
+
+    // Adjust for the fact that for multi-row queries the end row is exclusive.
+    {
+      final byte[] scannerStopRow =
+          rowBytes(endRow + (startRow != endRow ? 1 : 0));
+      scan.setStopRow(scannerStopRow);
+    }
+
+    final long initialSeekCount = StoreFileScanner.getSeekCount();
+    final InternalScanner scanner = region.getScanner(scan);
+    final List<KeyValue> results = new ArrayList<KeyValue>();
+    final List<KeyValue> actualKVs = new ArrayList<KeyValue>();
+
+    // Such a clumsy do-while loop appears to be the official way to use an
+    // internalScanner. scanner.next() return value refers to the _next_
+    // result, not to the one already returned in results.
+    boolean hasNext;
+    do {
+      hasNext = scanner.next(results);
+      actualKVs.addAll(results);
+      results.clear();
+    } while (hasNext);
+
+    List<KeyValue> filteredKVs = filterExpectedResults(qualSet,
+        rowBytes(startRow), rowBytes(endRow), maxVersions);
+    final String rowRestrictionStr =
+        (startRow == -1 && endRow == -1) ? "all rows" : (
+            startRow == endRow ? ("row=" + startRow) : ("startRow="
+            + startRow + ", " + "endRow=" + endRow));
+    final String columnRestrictionStr =
+        columnArr.length == 0 ? "all columns"
+            : ("columns=" + Arrays.toString(columnArr));
+    final String testDesc =
+        "Bloom=" + bloomType + ", compr=" + comprAlgo + ", "
+            + (scan.isGetScan() ? "Get" : "Scan") + ": "
+            + columnRestrictionStr + ", " + rowRestrictionStr
+            + ", maxVersions=" + maxVersions + ", lazySeek=" + lazySeekEnabled;
+    long seekCount = StoreFileScanner.getSeekCount() - initialSeekCount;
+    System.err.println("Seek count: " + seekCount + ", KVs returned: "
+        + actualKVs.size() + ". " + testDesc +
+        (lazySeekEnabled ? "\n" : ""));
+    if (lazySeekEnabled) {
+      totalSeekLazy += seekCount;
+    } else {
+      totalSeekDiligent += seekCount;
+    }
+    assertKVListsEqual(testDesc, filteredKVs, actualKVs);
+  }
+
+  private List<KeyValue> filterExpectedResults(Set<String> qualSet,
+      byte[] startRow, byte[] endRow, int maxVersions) {
+    final List<KeyValue> filteredKVs = new ArrayList<KeyValue>();
+    final Map<String, Integer> verCount = new HashMap<String, Integer>();
+    for (KeyValue kv : expectedKVs) {
+      if (startRow.length > 0 &&
+          Bytes.compareTo(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
+              startRow, 0, startRow.length) < 0) {
+        continue;
+      }
+
+      // In this unit test the end row is always inclusive.
+      if (endRow.length > 0 &&
+          Bytes.compareTo(kv.getBuffer(), kv.getRowOffset(), kv.getRowLength(),
+              endRow, 0, endRow.length) > 0) {
+        continue;
+      }
+
+      if (!qualSet.isEmpty() && (Bytes.compareTo(
+            kv.getBuffer(), kv.getFamilyOffset(), kv.getFamilyLength(),
+            FAMILY_BYTES, 0, FAMILY_BYTES.length
+          ) != 0 ||
+          !qualSet.contains(Bytes.toString(kv.getQualifier())))) {
+        continue;
+      }
+
+      final String rowColStr =
+        Bytes.toStringBinary(kv.getRow()) + "/"
+            + Bytes.toStringBinary(kv.getFamily()) + ":"
+            + Bytes.toStringBinary(kv.getQualifier());
+      final Integer curNumVer = verCount.get(rowColStr);
+      final int newNumVer = curNumVer != null ? (curNumVer + 1) : 1;
+      if (newNumVer <= maxVersions) {
+        filteredKVs.add(kv);
+        verCount.put(rowColStr, newNumVer);
+      }
+    }
+
+    return filteredKVs;
+  }
+
+  private void prepareExpectedKVs(long latestDelTS) {
+    final List<KeyValue> filteredKVs = new ArrayList<KeyValue>();
+    for (KeyValue kv : expectedKVs) {
+      if (kv.getTimestamp() > latestDelTS || latestDelTS == -1) {
+        filteredKVs.add(kv);
+      }
+    }
+    expectedKVs = filteredKVs;
+    Collections.sort(expectedKVs, KeyValue.COMPARATOR);
+  }
+
+  public void put(String qual, long ts) {
+    if (!putTimestamps.contains(ts)) {
+      put.add(FAMILY_BYTES, Bytes.toBytes(qual), ts, createValue(ts));
+      putTimestamps.add(ts);
+    }
+    if (VERBOSE) {
+      LOG.info("put: row " + Bytes.toStringBinary(put.getRow())
+          + ", cf " + FAMILY + ", qualifier " + qual + ", ts " + ts);
+    }
+  }
+
+  private byte[] createValue(long ts) {
+    return Bytes.toBytes("value" + ts);
+  }
+
+  public void delAtTimestamp(String qual, long ts) {
+    del.deleteColumn(FAMILY_BYTES, Bytes.toBytes(qual), ts);
+    logDelete(qual, ts, "at");
+  }
+
+  private void logDelete(String qual, long ts, String delType) {
+    if (VERBOSE) {
+      LOG.info("del " + delType + ": row "
+          + Bytes.toStringBinary(put.getRow()) + ", cf " + FAMILY
+          + ", qualifier " + qual + ", ts " + ts);
+    }
+  }
+
+  private void delUpToTimestamp(String qual, long upToTS) {
+    del.deleteColumns(FAMILY_BYTES, Bytes.toBytes(qual), upToTS);
+    logDelete(qual, upToTS, "up to and including");
+  }
+
+  private long randLong(long n) {
+    long l = rand.nextLong();
+    if (l == Long.MIN_VALUE)
+      l = Long.MAX_VALUE;
+    return Math.abs(l) % n;
+  }
+
+  private long randBetween(long a, long b) {
+    long x = a + randLong(b - a + 1);
+    assertTrue(a <= x && x <= b);
+    return x;
+  }
+
+  private final String rowStr(int i) {
+    return ("row" + i).intern();
+  }
+
+  private final byte[] rowBytes(int i) {
+    if (i == -1) {
+      return HConstants.EMPTY_BYTE_ARRAY;
+    }
+    return Bytes.toBytes(rowStr(i));
+  }
+
+  private final String getQualStr(int i) {
+    return ("qual" + i).intern();
+  }
+
+  public void createTimestampRange(long minTS, long maxTS,
+      long deleteUpToTS) throws IOException {
+    assertTrue(minTS < maxTS);
+    assertTrue(deleteUpToTS == -1
+        || (minTS <= deleteUpToTS && deleteUpToTS <= maxTS));
+
+    for (int iRow = 0; iRow < NUM_ROWS; ++iRow) {
+      final String row = rowStr(iRow);
+      final byte[] rowBytes = Bytes.toBytes(row);
+      for (int iCol = 0; iCol < NUM_COLS; ++iCol) {
+        final String qual = getQualStr(iCol);
+        final byte[] qualBytes = Bytes.toBytes(qual);
+        put = new Put(rowBytes);
+
+        putTimestamps.clear();
+        put(qual, minTS);
+        put(qual, maxTS);
+        for (int i = 0; i < PUTS_PER_ROW_COL; ++i) {
+          put(qual, randBetween(minTS, maxTS));
+        }
+
+        long[] putTimestampList = new long[putTimestamps.size()];
+        {
+          int i = 0;
+          for (long ts : putTimestamps) {
+            putTimestampList[i++] = ts;
+          }
+        }
+
+        // Delete a predetermined number of particular timestamps
+        delTimestamps.clear();
+        assertTrue(putTimestampList.length >= DELETES_PER_ROW_COL);
+        int numToDel = DELETES_PER_ROW_COL;
+        int tsRemaining = putTimestampList.length;
+        del = new Delete(rowBytes);
+        for (long ts : putTimestampList) {
+          if (rand.nextInt(tsRemaining) < numToDel) {
+            delAtTimestamp(qual, ts);
+            putTimestamps.remove(ts);
+            --numToDel;
+          }
+
+          if (--tsRemaining == 0) {
+            break;
+          }
+        }
+
+        // Another type of delete: everything up to the given timestamp.
+        if (deleteUpToTS != -1) {
+          delUpToTimestamp(qual, deleteUpToTS);
+        }
+
+        region.put(put);
+        if (!del.isEmpty()) {
+          region.delete(del, null, true);
+        }
+
+        // Add remaining timestamps (those we have not deleted) to expected results
+        for (long ts : putTimestamps) {
+          expectedKVs.add(new KeyValue(rowBytes, FAMILY_BYTES, qualBytes, ts,
+              KeyValue.Type.Put));
+        }
+      }
+    }
+
+    region.flushcache();
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (region != null) {
+      region.close();
+    }
+
+    // We have to re-set the lazy seek flag back to the default so that other
+    // unit tests are not affected.
+    StoreScanner.enableLazySeekGlobally(
+        StoreScanner.LAZY_SEEK_ENABLED_BY_DEFAULT);
+  }
+
+}