You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:23:33 UTC

svn commit: r1181593 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/KeyValue.java main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java

Author: nspiegelberg
Date: Tue Oct 11 02:23:33 2011
New Revision: 1181593

URL: http://svn.apache.org/viewvc?rev=1181593&view=rev
Log:
Multi-column scanner unit test

Summary:
This is the unit test part of D276188 ("Optimize multi-column scans using Bloom
filters"). Unlike D276188, this version of TestMultiColumnScanner also tests
deletes. A couple of additional comments and an exception in a case that should
not happen are also included.

After a discussion with Kannan and Liyin we decided to go with Liyin's version
of multi-column scanner Bloom filter optimization (D276469, "Optimize multiple
column get operation by taking advantage of bloomfilters"), provided that we
address a couple of issues, such as avoiding seeking in store files that don't
contain the first column in the beginning. However, this unit test is still
relevant and we will use it to test the changes in D276469.

Test Plan: Run all unit tests, including the new one. Load testing in
HBaseTest.
Reviewed By: kannan
Reviewers: kannan, liyintang
CC: , hbase@lists, kannan
Revert Plan: OK
Differential Revision: 277037

Added:
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1181593&r1=1181592&r2=1181593&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/KeyValue.java Tue Oct 11 02:23:33 2011
@@ -154,6 +154,13 @@ public class KeyValue implements Writabl
    * enum ordinals . They change if item is removed or moved.  Do our own codes.
    */
   public static enum Type {
+    /**
+     * The minimum type. The latest type in the sorted order out of all
+     * key-values for the same row/column/timestamp combination. See
+     * {@link #createLastOnRow} functions. The minimum key type is actually
+     * greater than all other types, as compared by
+     * {@link KeyComparator#compare(byte[], int, int, byte[], int, int)}.
+     */
     Minimum((byte)0),
     Put((byte)4),
 
@@ -161,7 +168,13 @@ public class KeyValue implements Writabl
     DeleteColumn((byte)12),
     DeleteFamily((byte)14),
 
-    // Maximum is used when searching; you look from maximum on down.
+    /**
+     * Maximum is used when searching; you look from maximum on down. The
+     * earliest type in the sorted order for the same row/column/timestamp. See
+     * {@link #createFirstOnRow} functions. The maximum key type is actually
+     * smaller than all other types, as compared by
+     * {@link KeyComparator#compare(byte[], int, int, byte[], int, int)}.
+     */
     Maximum((byte)255);
 
     private final byte code;
@@ -1880,8 +1893,14 @@ public class KeyValue implements Writabl
       byte ltype = left[loffset + (llength - 1)];
       byte rtype = right[roffset + (rlength - 1)];
 
+      // If the column is not specified, the "minimum" key type appears
+      // the latest in the sorted order, regardless of the timestamp. This is
+      // used for specifying the last key/value in a given row, because there
+      // is no "lexicographically last column" (it would be infinitely long).
+      // The "maximum" key type does not need this behavior.
       if (lcolumnlength == 0 && ltype == Type.Minimum.getCode()) {
-        return 1; // left is bigger.
+        // left is "bigger", i.e. it appears later in the sorted order
+        return 1;
       }
       if (rcolumnlength == 0 && rtype == Type.Minimum.getCode()) {
         return -1;
@@ -1908,7 +1927,9 @@ public class KeyValue implements Writabl
 
       if (!this.ignoreType) {
         // Compare types. Let the delete types sort ahead of puts; i.e. types
-        // of higher numbers sort before those of lesser numbers
+        // of higher numbers sort before those of lesser numbers. Maximum (255)
+        // appears ahead of everything, and minimum (0) appears after
+        // everything.
         return (0xff & rtype) - (0xff & ltype);
       }
       return 0;

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java?rev=1181593&r1=1181592&r2=1181593&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanDeleteTracker.java Tue Oct 11 02:23:33 2011
@@ -130,7 +130,11 @@ public class ScanDeleteTracker implement
         // Next column case.
         deleteBuffer = null;
       } else {
-        //Should never happen, throw Exception
+        throw new IllegalStateException("isDelete failed: deleteBuffer="
+            + Bytes.toStringBinary(deleteBuffer, deleteOffset, deleteLength)
+            + ", qualifier="
+            + Bytes.toStringBinary(buffer, qualifierOffset, qualifierLength)
+            + ", timestamp=" + timestamp + ", comparison result: " + ret);
       }
     }
 

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java?rev=1181593&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java Tue Oct 11 02:23:33 2011
@@ -0,0 +1,338 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueTestUtil;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Tests optimized scanning of multiple columns.
+ */
+@RunWith(Parameterized.class)
+public class TestMultiColumnScanner {
+
+  private static final Log LOG =
+      LogFactory.getLog(TestMultiColumnScanner.class);
+
+  private static final String FAMILY = "CF";
+  private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
+  private static final int MAX_VERSIONS = 50;
+
+  /**
+   * The size of the column qualifier set used. Increasing this parameter
+   * exponentially increases test time.
+   */
+  private static final int NUM_COLUMNS = 8;
+
+  private static final int MAX_COLUMN_BIT_MASK = 1 << NUM_COLUMNS - 1;
+  private static final int NUM_FLUSHES = 10;
+  private static final int NUM_ROWS = 20;
+
+  /** A large value of type long for use as a timestamp */
+  private static final long BIG_LONG = 9111222333444555666L;
+
+  /**
+   * Timestamps to test with. Cannot use {@link Long#MAX_VALUE} here, because
+   * it will be replaced by an timestamp auto-generated based on the time.
+   */
+  private static final long[] TIMESTAMPS = new long[] {
+      1, 3, 5, Integer.MAX_VALUE, BIG_LONG, Long.MAX_VALUE - 1};
+
+  /** The probability that a column is skipped in a store file. */
+  private static final double COLUMN_SKIP_PROBABILITY = 0.7;
+
+  /** The probability to delete a row/column pair */
+  private static final double DELETE_PROBABILITY = 0.02;
+
+  private final static HBaseTestingUtility TEST_UTIL =
+    new HBaseTestingUtility();
+
+  private Compression.Algorithm comprAlgo;
+  private StoreFile.BloomType bloomType;
+
+  private long lastBlocksRead;
+  private long lastCacheHits;
+
+  // Some static sanity-checking.
+  static {
+    assertTrue(BIG_LONG > 0.9 * Long.MAX_VALUE); // Guard against typos.
+
+    // Ensure TIMESTAMPS are sorted.
+    for (int i = 0; i < TIMESTAMPS.length - 1; ++i)
+      assertTrue(TIMESTAMPS[i] < TIMESTAMPS[i + 1]);
+  }
+
+  @Parameters
+  public static final Collection<Object[]> parameters() {
+    List<Object[]> configurations = new ArrayList<Object[]>();
+    for (Compression.Algorithm comprAlgo :
+         HBaseTestingUtility.COMPRESSION_ALGORITHMS) {
+      for (StoreFile.BloomType bloomType : StoreFile.BloomType.values()) {
+        configurations.add(new Object[] { comprAlgo, bloomType });
+      }
+    }
+    return configurations;
+  }
+
+  public TestMultiColumnScanner(Compression.Algorithm comprAlgo,
+      StoreFile.BloomType bloomType) {
+    this.comprAlgo = comprAlgo;
+    this.bloomType = bloomType;
+  }
+
+  private long getBlocksRead() {
+    return HRegion.getNumericMetric("cf." + FAMILY + ".fsBlockReadCnt");
+  }
+
+  private long getCacheHits() {
+    return HRegion.getNumericMetric("cf." + FAMILY +
+        ".fsBlockReadCacheHitCnt");
+  }
+
+  private void saveBlockStats() {
+    lastBlocksRead = getBlocksRead();
+    lastCacheHits = getCacheHits();
+  }
+
+  private void showBlockStats() {
+    long blocksRead = blocksReadDelta();
+    long cacheHits = cacheHitsDelta();
+    LOG.info("Compression: " + comprAlgo + ", Bloom type: "
+        + bloomType + ", blocks read: " + blocksRead + ", block cache hits: "
+        + cacheHits + ", misses: " + (blocksRead - cacheHits));
+  }
+
+  private long cacheHitsDelta() {
+    return getCacheHits() - lastCacheHits;
+  }
+
+  private long blocksReadDelta() {
+    return getBlocksRead() - lastBlocksRead;
+  }
+
+  @Test
+  public void testMultiColumnScanner() throws IOException {
+    String table = "TestMultiColumnScanner";
+    HColumnDescriptor hcd =
+      new HColumnDescriptor(FAMILY_BYTES, MAX_VERSIONS,
+          comprAlgo.getName(),
+          HColumnDescriptor.DEFAULT_IN_MEMORY,
+          HColumnDescriptor.DEFAULT_BLOCKCACHE,
+          HColumnDescriptor.DEFAULT_TTL,
+          bloomType.toString());
+    HTableDescriptor htd = new HTableDescriptor(table);
+    htd.addFamily(hcd);
+    HRegionInfo info = new HRegionInfo(htd, null, null, false);
+    HRegion region = HRegion.createHRegion(
+        info, HBaseTestingUtility.getTestDir(), TEST_UTIL.getConfiguration());
+    List<String> rows = sequentialStrings("row", NUM_ROWS);
+    List<String> qualifiers = sequentialStrings("qual", NUM_COLUMNS);
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    Set<String> keySet = new HashSet<String>();
+
+    // A map from <row>_<qualifier> to the most recent delete timestamp for
+    // that column.
+    Map<String, Long> lastDelTimeMap = new HashMap<String, Long>();
+
+    Random rand = new Random(29372937L);
+    for (int iFlush = 0; iFlush < NUM_FLUSHES; ++iFlush) {
+      for (String qual : qualifiers) {
+        // This is where we decide to include or not include this column into
+        // this store file, regardless of row and timestamp.
+        if (rand.nextDouble() < COLUMN_SKIP_PROBABILITY)
+          continue;
+
+        byte[] qualBytes = Bytes.toBytes(qual);
+        for (String row : rows) {
+          Put p = new Put(Bytes.toBytes(row));
+          for (long ts : TIMESTAMPS) {
+            String value = createValue(row, qual, ts);
+            KeyValue kv = KeyValueTestUtil.create(row, FAMILY, qual, ts, value);
+            assertEquals(kv.getTimestamp(), ts);
+            p.add(kv);
+            String keyAsString = kv.toString();
+            if (!keySet.contains(keyAsString)) {
+              keySet.add(keyAsString);
+              kvs.add(kv);
+            }
+          }
+          region.put(p);
+
+          Delete d = new Delete(Bytes.toBytes(row));
+          boolean deletedSomething = false;
+          for (long ts : TIMESTAMPS)
+            if (rand.nextDouble() < DELETE_PROBABILITY) {
+              d.deleteColumns(FAMILY_BYTES, qualBytes, ts);
+              String rowAndQual = row +"_" + qual;
+              Long whenDeleted = lastDelTimeMap.get(rowAndQual);
+              lastDelTimeMap.put(rowAndQual, whenDeleted == null ? ts :
+                  Math.max(ts, whenDeleted));
+              deletedSomething = true;
+            }
+          if (deletedSomething)
+            region.delete(d, null, true);
+        }
+      }
+      region.flushcache();
+    }
+
+    saveBlockStats();
+    Collections.sort(kvs, KeyValue.COMPARATOR);
+    for (int maxVersions = 1; maxVersions <= TIMESTAMPS.length; ++maxVersions)
+    {
+      for (int columnBitMask = 1; columnBitMask <= MAX_COLUMN_BIT_MASK;
+           ++columnBitMask) {
+        Scan scan = new Scan();
+        scan.setMaxVersions(maxVersions);
+        Set<String> qualSet = new TreeSet<String>();
+        {
+          int columnMaskTmp = columnBitMask;
+          for (String qual : qualifiers) {
+            if ((columnMaskTmp & 1) != 0) {
+              scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qual));
+              qualSet.add(qual);
+            }
+            columnMaskTmp >>= 1;
+          }
+          assertEquals(0, columnMaskTmp);
+        }
+
+        InternalScanner scanner = region.getScanner(scan);
+        List<KeyValue> results = new ArrayList<KeyValue>();
+
+        int kvPos = 0;
+        int numResults = 0;
+        String queryInfo = "columns queried: " + qualSet + " (columnBitMask="
+            + columnBitMask + "), maxVersions=" + maxVersions;
+
+        while (scanner.next(results) || results.size() > 0) {
+          for (KeyValue kv : results) {
+            while (kvPos < kvs.size()
+                && !matchesQuery(kvs.get(kvPos), qualSet, maxVersions,
+                    lastDelTimeMap)) {
+              ++kvPos;
+            }
+            String rowQual = getRowQualStr(kv);
+            String deleteInfo = "";
+            Long lastDelTS = lastDelTimeMap.get(rowQual);
+            if (lastDelTS != null) {
+              deleteInfo = "; last timestamp when row/column " + rowQual
+                  + " was deleted: " + lastDelTS;
+            }
+            assertTrue("Scanner returned additional key/value: " + kv + ", "
+                + queryInfo + deleteInfo + ";", kvPos < kvs.size());
+            assertEquals("Scanner returned wrong key/value; " + queryInfo
+                + deleteInfo + ";", kvs.get(kvPos), kv);
+            ++kvPos;
+            ++numResults;
+          }
+          results.clear();
+        }
+        for ( ; kvPos < kvs.size(); ++kvPos) {
+          KeyValue remainingKV = kvs.get(kvPos);
+          assertFalse("Matching column not returned by scanner: "
+              + remainingKV + ", " + queryInfo + ", results returned: "
+              + numResults, matchesQuery(remainingKV, qualSet, maxVersions,
+                  lastDelTimeMap));
+        }
+      }
+    }
+    showBlockStats();
+    assertTrue("This test is supposed to delete at least some row/column " +
+        "pairs", lastDelTimeMap.size() > 0);
+    LOG.info("Number of row/col pairs deleted at least once: " +
+       lastDelTimeMap.size());
+  }
+
+  private static String getRowQualStr(KeyValue kv) {
+    String rowStr = Bytes.toString(kv.getBuffer(), kv.getRowOffset(),
+        kv.getRowLength());
+    String qualStr = Bytes.toString(kv.getBuffer(), kv.getQualifierOffset(),
+        kv.getQualifierLength());
+    return rowStr + "_" + qualStr;
+  }
+
+  private static boolean matchesQuery(KeyValue kv, Set<String> qualSet,
+      int maxVersions, Map<String, Long> lastDelTimeMap) {
+    Long lastDelTS = lastDelTimeMap.get(getRowQualStr(kv));
+    long ts = kv.getTimestamp();
+    return qualSet.contains(qualStr(kv)) &&
+        ts >= TIMESTAMPS[TIMESTAMPS.length - maxVersions] &&
+        (lastDelTS == null || ts > lastDelTS);
+  }
+
+  private static String qualStr(KeyValue kv) {
+    return Bytes.toString(kv.getBuffer(), kv.getQualifierOffset(),
+        kv.getQualifierLength());
+  }
+
+  private static String createValue(String row, String qual, long ts) {
+    return "value_for_" + row + "_" + qual + "_" + ts;
+  }
+
+  private static List<String> sequentialStrings(String prefix, int n) {
+    List<String> lst = new ArrayList<String>();
+    for (int i = 0; i < n; ++i) {
+      StringBuilder sb = new StringBuilder();
+      sb.append(prefix + i);
+
+      // Make column length depend on i.
+      int iBitShifted = i;
+      while (iBitShifted != 0) {
+        sb.append((iBitShifted & 1) == 0 ? 'a' : 'b');
+        iBitShifted >>= 1;
+      }
+
+      lst.add(sb.toString());
+    }
+
+    return lst;
+  }
+
+}