You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 04:24:44 UTC

svn commit: r1181601 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/io/hfile/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/

Author: nspiegelberg
Date: Tue Oct 11 02:24:44 2011
New Revision: 1181601

URL: http://svn.apache.org/viewvc?rev=1181601&view=rev
Log:
Optimize multi-column scans using Bloom filters

Summary: Previously we only used row-column Bloom filters for scans that only
requested one column. We have seen production queries that request up to 200
columns, and with say ~6 store files per store (region / column family
combination) this might have resulted in 1200 block read operations in the worst
case. With this diff we will be avoiding seeks on store files that we know don't
contain the row/column of interest when using an ExplicitColumnTracker. The
performance should remain the same for column range queries.
Test Plan: Existing unit tests. A new unit test (TestMultiColumnScanner). Load
testing using HBaseTest.
Reviewed By: kannan
Reviewers: kannan, liyintang
Commenters: liyintang
CC: hbase@lists, , liyintang, mbautin, kannan
Revert Plan: OK
Differential Revision: 276188

Added:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractKeyValueScanner.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1181601&r1=1181600&r2=1181601&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/KeyValue.java Tue Oct 11 02:24:44 2011
@@ -1724,6 +1724,21 @@ public class KeyValue implements Writabl
   }
 
   /**
+   * Similar to {@link #createLastOnRow(byte[], int, int, byte[], int, int,
+   * byte[], int, int)} but takes a {@link KeyValue}.
+   *
+   * @param kv the key-value pair to take row and column from
+   * @return the last key on the row/column of the given key-value pair
+   */
+  public KeyValue createLastOnRowCol() {
+    return new KeyValue(
+        bytes, getRowOffset(), getRowLength(),
+        bytes, getFamilyOffset(), getFamilyLength(),
+        bytes, getQualifierOffset(), getQualifierLength(),
+        HConstants.OLDEST_TIMESTAMP, Type.Minimum, null, 0, 0);
+  }
+
+  /**
    * @param b
    * @return A KeyValue made of a byte array that holds the key-only part.
    * Needed to convert hfile index members to KeyValues.

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java?rev=1181601&r1=1181600&r2=1181601&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/AbstractHFileReader.java Tue Oct 11 02:24:44 2011
@@ -369,4 +369,8 @@ public abstract class AbstractHFileReade
     return fsBlockReader;
   }
 
+  public Path getPath() {
+    return path;
+  }
+
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java?rev=1181601&r1=1181600&r2=1181601&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/io/hfile/HFile.java Tue Oct 11 02:24:44 2011
@@ -350,6 +350,8 @@ public class HFile {
      * version. Knows nothing about how that metadata is structured.
      */
      DataInput getBloomFilterMetadata() throws IOException;
+
+     Path getPath();
   }
 
   private static Reader pickReaderVersion(Path path, FSDataInputStream fsdis,

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractKeyValueScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractKeyValueScanner.java?rev=1181601&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractKeyValueScanner.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractKeyValueScanner.java Tue Oct 11 02:24:44 2011
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+
+public abstract class AbstractKeyValueScanner implements KeyValueScanner {
+
+  @Override
+  public boolean seekExactly(KeyValue kv, boolean forward) throws IOException {
+    return forward ? reseek(kv) : seek(kv);
+  }
+
+}

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1181601&r1=1181600&r2=1181601&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Tue Oct 11 02:24:44 2011
@@ -2761,6 +2761,10 @@ public class HRegion implements HeapSize
       }
       this.filterClosed = true;
     }
+
+    KeyValueHeap getStoreHeapForTesting() {
+      return storeHeap;
+    }
   }
 
   // Utility methods

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=1181601&r1=1181600&r2=1181601&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Tue Oct 11 02:24:44 2011
@@ -46,6 +46,30 @@ public class KeyValueHeap implements Key
   private KVScannerComparator comparator;
 
   /**
+   * A helper enum that knows how to call the correct seek function within a
+   * {@link KeyValueScanner}.
+   */
+  public enum SeekType {
+    NORMAL {
+      @Override
+      public boolean seek(KeyValueScanner scanner, KeyValue kv,
+          boolean forward) throws IOException {
+        return forward ? scanner.reseek(kv) : scanner.seek(kv);
+      }
+    },
+    EXACT {
+      @Override
+      public boolean seek(KeyValueScanner scanner, KeyValue kv,
+          boolean forward) throws IOException {
+        return scanner.seekExactly(kv, forward);
+      }
+    };
+
+    public abstract boolean seek(KeyValueScanner scanner, KeyValue kv,
+        boolean forward) throws IOException;
+  }
+
+  /**
    * Constructor.  This KeyValueHeap will handle closing of passed in
    * KeyValueScanners.
    * @param scanners
@@ -210,54 +234,53 @@ public class KeyValueHeap implements Key
    * @return true if KeyValues exist at or after specified key, false if not
    * @throws IOException
    */
+  @Override
   public boolean seek(KeyValue seekKey) throws IOException {
-    if (this.current == null) {
-      return false;
-    }
-    this.heap.add(this.current);
-    this.current = null;
-
-    KeyValueScanner scanner;
-    while((scanner = this.heap.poll()) != null) {
-      KeyValue topKey = scanner.peek();
-      if(comparator.getComparator().compare(seekKey, topKey) <= 0) { // Correct?
-        // Top KeyValue is at-or-after Seek KeyValue
-        this.current = scanner;
-        return true;
-      }
-      if(!scanner.seek(seekKey)) {
-        scanner.close();
-      } else {
-        this.heap.add(scanner);
-      }
-    }
-    // Heap is returning empty, scanner is done
-    return false;
+    return generalizedSeek(seekKey, SeekType.NORMAL, false);
   }
 
+  /**
+   * This function is identical to the {@link #seek(KeyValue)} function except
+   * that scanner.seek(seekKey) is changed to scanner.reseek(seekKey).
+   */
+  @Override
   public boolean reseek(KeyValue seekKey) throws IOException {
-    //This function is very identical to the seek(KeyValue) function except that
-    //scanner.seek(seekKey) is changed to scanner.reseek(seekKey)
-    if (this.current == null) {
+    return generalizedSeek(seekKey, SeekType.NORMAL, true);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public boolean seekExactly(KeyValue seekKey, boolean forward)
+      throws IOException {
+    return generalizedSeek(seekKey, SeekType.EXACT, forward);
+  }
+
+  private boolean generalizedSeek(KeyValue seekKey, SeekType seekType,
+      boolean forward) throws IOException {
+    if (current == null) {
       return false;
     }
-    this.heap.add(this.current);
-    this.current = null;
+    heap.add(current);
+    current = null;
 
     KeyValueScanner scanner;
-    while ((scanner = this.heap.poll()) != null) {
+    while ((scanner = heap.poll()) != null) {
       KeyValue topKey = scanner.peek();
       if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
         // Top KeyValue is at-or-after Seek KeyValue
-        this.current = scanner;
+        current = scanner;
         return true;
       }
-      if (!scanner.reseek(seekKey)) {
+
+      if (!seekType.seek(scanner, seekKey, forward)) {
         scanner.close();
       } else {
-        this.heap.add(scanner);
+        heap.add(scanner);
       }
     }
+
     // Heap is returning empty, scanner is done
     return false;
   }
@@ -273,4 +296,8 @@ public class KeyValueHeap implements Key
   public long getSequenceID() {
     return 0;
   }
+
+  KeyValueScanner getCurrentForTesting() {
+    return current;
+  }
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java?rev=1181601&r1=1181600&r2=1181601&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java Tue Oct 11 02:24:44 2011
@@ -48,7 +48,7 @@ public interface KeyValueScanner {
 
   /**
    * Reseek the scanner at or after the specified KeyValue.
-   * This method is guaranteed to seek to or before the required key only if the
+   * This method is guaranteed to seek at or after the required key only if the
    * key comes after the current position of the scanner. Should not be used
    * to seek to a key which may come before the current position.
    * @param key seek value (should be non-null)
@@ -57,6 +57,16 @@ public interface KeyValueScanner {
   public boolean reseek(KeyValue key) throws IOException;
 
   /**
+   * Similar to {@link #seek} (or {@link #reseek} if forward is true) but only
+   * does a seek operation after checking that it is really necessary for the
+   * row/column combination specified by the kv parameter. This function was
+   * added to avoid unnecessary disk seeks on multi-column get queries using
+   * Bloom filter checking. Should only be used for queries where the set of
+   * columns is specified exactly.
+   */
+  public boolean seekExactly(KeyValue kv, boolean forward) throws IOException;
+
+  /**
    * Get the sequence id associated with this KeyValueScanner. This is required
    * for comparing multiple files to find out which one has the latest data.
    * The default implementation for this would be to return 0. A file having

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1181601&r1=1181600&r2=1181601&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Tue Oct 11 02:24:44 2011
@@ -485,7 +485,7 @@ public class MemStore implements HeapSiz
    * map and snapshot.
    * This behaves as if it were a real scanner but does not maintain position.
    */
-  protected class MemStoreScanner implements KeyValueScanner {
+  protected class MemStoreScanner extends AbstractKeyValueScanner {
     // Next row information for either kvset or snapshot
     private KeyValue kvsetNextRow = null;
     private KeyValue snapshotNextRow = null;

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1181601&r1=1181600&r2=1181601&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Tue Oct 11 02:24:44 2011
@@ -63,6 +63,12 @@ public class ScanQueryMatcher {
   protected byte [] row;
 
   /**
+   * True if we are only interested in the given exact set of columns. In that
+   * case we can use Bloom filters to avoid unnecessary disk seeks.
+   */
+  private boolean exactColumnQuery;
+
+  /**
    * Constructs a ScanQueryMatcher for a Scan.
    * @param scan
    * @param family
@@ -91,8 +97,10 @@ public class ScanQueryMatcher {
       // We can share the ExplicitColumnTracker, diff is we reset
       // between rows, not between storefiles.
       this.columns = new ExplicitColumnTracker(columns,maxVersions);
+      exactColumnQuery = true;
     }
   }
+
   public ScanQueryMatcher(Scan scan, byte [] family,
       NavigableSet<byte[]> columns, long ttl,
       KeyValue.KeyComparator rowComparator, int maxVersions) {
@@ -320,6 +328,10 @@ public class ScanQueryMatcher {
         null, 0, 0);
   }
 
+  public boolean isExactColumnQuery() {
+    return exactColumnQuery;
+  }
+
   /**
    * {@link #match} return codes.  These instruct the scanner moving through
    * memstores and StoreFiles what to do with the current KeyValue.

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1181601&r1=1181600&r2=1181601&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Oct 11 02:24:44 2011
@@ -1071,38 +1071,84 @@ public class StoreFile {
 
     private boolean passesBloomFilter(Scan scan,
         final SortedSet<byte[]> columns) {
-      // Cache Bloom filter as a local variable in case it is set to null by
-      // another thread on an IO error.
-      BloomFilter bloomFilter = this.bloomFilter;
-
-      if (bloomFilter == null || !scan.isGetScan()) {
+      // Multi-column non-get scans will use Bloom filters through the
+      // lower-level API function that this function calls.
+      if (!scan.isGetScan())
         return true;
-      }
-
-      // Empty file?
-      if (reader.getTrailer().getEntryCount() == 0)
-        return false;
 
       byte[] row = scan.getStartRow();
-      byte[] key;
       switch (this.bloomFilterType) {
         case ROW:
-          key = row;
-          break;
+          return passesBloomFilter(row, 0, row.length, null, 0, 0);
 
         case ROWCOL:
           if (columns != null && columns.size() == 1) {
             byte[] column = columns.first();
-            key = bloomFilter.createBloomKey(row, 0, row.length,
-                column, 0, column.length);
-            break;
+            return passesBloomFilter(row, 0, row.length, column, 0,
+                column.length);
           }
+
+          // For multi-column queries the Bloom filter is checked from the
+          // seekExact operation.
+          return true;
+
+        default:
           return true;
+      }
+    }
+
+    /**
+     * A method for checking Bloom filters. Called directly from
+     * {@link StoreFileScanner} in case of a multi-column query.
+     *
+     * @param row
+     * @param rowOffset
+     * @param rowLen
+     * @param col
+     * @param colOffset
+     * @param colLen
+     * @return
+     */
+    public boolean passesBloomFilter(byte[] row, int rowOffset, int rowLen,
+        byte[] col, int colOffset, int colLen) {
+      if (bloomFilter == null)
+        return true;
+
+      byte[] key;
+      switch (bloomFilterType) {
+        case ROW:
+          if (col != null) {
+            throw new RuntimeException("Row-only Bloom filter called with " +
+                "column specified");
+          }
+          if (rowOffset != 0 || rowLen != row.length) {
+              throw new AssertionError("For row-only Bloom filters the row "
+                  + "must occupy the whole array");
+          }
+          key = row;
+          break;
+
+        case ROWCOL:
+          key = bloomFilter.createBloomKey(row, rowOffset, rowLen, col,
+              colOffset, colLen);
+          break;
 
         default:
           return true;
       }
 
+      // Cache Bloom filter as a local variable in case it is set to null by
+      // another thread on an IO error.
+      BloomFilter bloomFilter = this.bloomFilter;
+
+      if (bloomFilter == null) {
+        return true;
+      }
+
+      // Empty file?
+      if (reader.getTrailer().getEntryCount() == 0)
+        return false;
+
       try {
         boolean shouldCheckBloom;
         ByteBuffer bloom;
@@ -1282,6 +1328,10 @@ public class StoreFile {
     HFile.Reader getHFileReader() {
       return reader;
     }
+
+    void disableBloomFilterForTesting() {
+      bloomFilter = null;
+    }
   }
 
   /**

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=1181601&r1=1181600&r2=1181601&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Tue Oct 11 02:24:44 2011
@@ -25,6 +25,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.regionserver.StoreFile.Reader;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -183,4 +184,32 @@ class StoreFileScanner implements KeyVal
   public long getSequenceID() {
     return reader.getSequenceID();
   }
+
+  @Override
+  public boolean seekExactly(KeyValue kv, boolean forward)
+      throws IOException {
+    if (reader.getBloomFilterType() != StoreFile.BloomType.ROWCOL ||
+        kv.getRowLength() == 0 || kv.getQualifierLength() == 0) {
+      return forward ? reseek(kv) : seek(kv);
+    }
+
+    boolean isInBloom = reader.passesBloomFilter(kv.getBuffer(),
+        kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
+        kv.getQualifierOffset(), kv.getQualifierLength());
+    if (isInBloom) {
+      // This row/column might be in this store file. Do a normal seek.
+      return forward ? reseek(kv) : seek(kv);
+    }
+
+    // Create a fake key/value, so that this scanner only bubbles up to the top
+    // of the KeyValueHeap in StoreScanner after we scanned this row/column in
+    // all other store files. The query matcher will then just skip this fake
+    // key/value and the store scanner will progress to the next column.
+    cur = kv.createLastOnRowCol();
+    return true;
+  }
+
+  Reader getReaderForTesting() {
+    return reader;
+  }
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1181601&r1=1181600&r2=1181601&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Tue Oct 11 02:24:44 2011
@@ -20,6 +20,7 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.KeyValue;
@@ -72,10 +73,14 @@ class StoreScanner implements KeyValueSc
     // pass columns = try to filter out unnecessary ScanFiles
     List<KeyValueScanner> scanners = getScanners(scan, columns);
 
-    // Seek all scanners to the start of the Row (or if the exact maching row key does not
-    // exist, then to the start of the next matching Row).
-    for(KeyValueScanner scanner : scanners) {
-      scanner.seek(matcher.getStartKey());
+    // Seek all scanners to the start of the Row (or if the exact matching row
+    // key does not exist, then to the start of the next matching Row).
+    if (matcher.isExactColumnQuery()) {
+      for (KeyValueScanner scanner : scanners)
+        scanner.seekExactly(matcher.getStartKey(), false);
+    } else {
+      for (KeyValueScanner scanner : scanners)
+        scanner.seek(matcher.getStartKey());
     }
 
     // set storeLimit
@@ -398,11 +403,31 @@ class StoreScanner implements KeyValueSc
   public synchronized boolean reseek(KeyValue kv) throws IOException {
     //Heap cannot be null, because this is only called from next() which
     //guarantees that heap will never be null before this call.
-    return this.heap.reseek(kv);
+    return matcher.isExactColumnQuery() ? heap.seekExactly(kv, true) :
+        heap.reseek(kv);
   }
 
   @Override
   public long getSequenceID() {
     return 0;
   }
+
+  @Override
+  public boolean seekExactly(KeyValue kv, boolean forward) throws IOException {
+    throw new NotImplementedException();
+  }
+
+  /**
+   * Used in testing.
+   * @return all scanners in no particular order
+   */
+  List<KeyValueScanner> getAllScannersForTesting() {
+    List<KeyValueScanner> allScanners = new ArrayList<KeyValueScanner>();
+    KeyValueScanner current = heap.getCurrentForTesting();
+    if (current != null)
+      allScanners.add(current);
+    for (KeyValueScanner scanner : heap.getHeap())
+      allScanners.add(scanner);
+    return allScanners;
+  }
 }

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java?rev=1181601&r1=1181600&r2=1181601&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java Tue Oct 11 02:24:44 2011
@@ -34,7 +34,7 @@ import java.util.List;
  * to the provided comparator, and then the whole thing pretends
  * to be a store file scanner.
  */
-public class KeyValueScanFixture implements KeyValueScanner {
+public class KeyValueScanFixture extends AbstractKeyValueScanner {
   ArrayList<KeyValue> data;
   Iterator<KeyValue> iter = null;
   KeyValue current = null;

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java?rev=1181601&r1=1181600&r2=1181601&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java Tue Oct 11 02:24:44 2011
@@ -208,7 +208,7 @@ public class TestKeyValueHeap extends HB
     }
   }
 
-  private static class Scanner implements KeyValueScanner {
+  private static class Scanner extends AbstractKeyValueScanner {
     private Iterator<KeyValue> iter;
     private KeyValue current;
     private boolean closed = false;

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java?rev=1181601&r1=1181600&r2=1181601&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMultiColumnScanner.java Tue Oct 11 02:24:44 2011
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.client.De
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.regionserver.StoreFile.BloomType;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -60,9 +61,10 @@ public class TestMultiColumnScanner {
   private static final Log LOG =
       LogFactory.getLog(TestMultiColumnScanner.class);
 
-  private static final String FAMILY = "CF";
-  private static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
-  private static final int MAX_VERSIONS = 50;
+  private static final String TABLE_NAME = "TestMultiColumnScanner";
+  static final String FAMILY = "CF";
+  static final byte[] FAMILY_BYTES = Bytes.toBytes(FAMILY);
+  static final int MAX_VERSIONS = 50;
 
   /**
    * The size of the column qualifier set used. Increasing this parameter
@@ -164,19 +166,7 @@ public class TestMultiColumnScanner {
 
   @Test
   public void testMultiColumnScanner() throws IOException {
-    String table = "TestMultiColumnScanner";
-    HColumnDescriptor hcd =
-      new HColumnDescriptor(FAMILY_BYTES, MAX_VERSIONS,
-          comprAlgo.getName(),
-          HColumnDescriptor.DEFAULT_IN_MEMORY,
-          HColumnDescriptor.DEFAULT_BLOCKCACHE,
-          HColumnDescriptor.DEFAULT_TTL,
-          bloomType.toString());
-    HTableDescriptor htd = new HTableDescriptor(table);
-    htd.addFamily(hcd);
-    HRegionInfo info = new HRegionInfo(htd, null, null, false);
-    HRegion region = HRegion.createHRegion(
-        info, HBaseTestingUtility.getTestDir(), TEST_UTIL.getConfiguration());
+    HRegion region = createRegion(TABLE_NAME, comprAlgo, bloomType);
     List<String> rows = sequentialStrings("row", NUM_ROWS);
     List<String> qualifiers = sequentialStrings("qual", NUM_COLUMNS);
     List<KeyValue> kvs = new ArrayList<KeyValue>();
@@ -221,7 +211,8 @@ public class TestMultiColumnScanner {
           Put p = new Put(Bytes.toBytes(row));
           for (long ts : TIMESTAMPS) {
             String value = createValue(row, qual, ts);
-            KeyValue kv = KeyValueTestUtil.create(row, FAMILY, qual, ts, value);
+            KeyValue kv = KeyValueTestUtil.create(row, FAMILY, qual, ts,
+                value);
             assertEquals(kv.getTimestamp(), ts);
             p.add(kv);
             String keyAsString = kv.toString();
@@ -316,6 +307,25 @@ public class TestMultiColumnScanner {
         "pairs", lastDelTimeMap.size() > 0);
     LOG.info("Number of row/col pairs deleted at least once: " +
        lastDelTimeMap.size());
+    region.close();
+  }
+
+  static HRegion createRegion(String tableName,
+      Compression.Algorithm comprAlgo, BloomType bloomType)
+      throws IOException {
+    HColumnDescriptor hcd =
+      new HColumnDescriptor(FAMILY_BYTES, MAX_VERSIONS,
+          comprAlgo.getName(),
+          HColumnDescriptor.DEFAULT_IN_MEMORY,
+          HColumnDescriptor.DEFAULT_BLOCKCACHE,
+          HColumnDescriptor.DEFAULT_TTL,
+          bloomType.toString());
+    HTableDescriptor htd = new HTableDescriptor(tableName);
+    htd.addFamily(hcd);
+    HRegionInfo info = new HRegionInfo(htd, null, null, false);
+    HRegion region = HRegion.createHRegion(
+        info, HBaseTestingUtility.getTestDir(), TEST_UTIL.getConfiguration());
+    return region;
   }
 
   private static String getRowQualStr(KeyValue kv) {
@@ -344,7 +354,7 @@ public class TestMultiColumnScanner {
     return row + "_" + qual;
   }
 
-  private static String createValue(String row, String qual, long ts) {
+  static String createValue(String row, String qual, long ts) {
     return "value_for_" + row + "_" + qual + "_" + ts;
   }
 

Added: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java?rev=1181601&view=auto
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java (added)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWithBloomError.java Tue Oct 11 02:24:44 2011
@@ -0,0 +1,183 @@
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValueTestUtil;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.hfile.Compression;
+import org.apache.hadoop.hbase.io.hfile.HFilePrettyPrinter;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import static org.apache.hadoop.hbase.regionserver.TestMultiColumnScanner.*;
+import static org.junit.Assert.*;
+
+/**
+ * Test a multi-column scanner when there is a Bloom filter false-positive.
+ * This is needed for the multi-column Bloom filter optimization.
+ */
+@RunWith(Parameterized.class)
+public class TestScanWithBloomError {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestScanWithBloomError.class);
+
+  private static final String TABLE_NAME = "ScanWithBloomError";
+  private static final String ROW = "theRow";
+  private static final String QUALIFIER_PREFIX = "qual";
+  private static final byte[] ROW_BYTES = Bytes.toBytes(ROW);
+  private static NavigableSet<Integer> allColIds = new TreeSet<Integer>();
+  private HRegion region;
+  private StoreFile.BloomType bloomType;
+  private FileSystem fs;
+  private Configuration conf;
+
+  private final static HBaseTestingUtility TEST_UTIL =
+    new HBaseTestingUtility();
+
+  @Parameters
+  public static final Collection<Object[]> parameters() {
+    List<Object[]> configurations = new ArrayList<Object[]>();
+    for (StoreFile.BloomType bloomType : StoreFile.BloomType.values()) {
+      configurations.add(new Object[] { bloomType });
+    }
+    return configurations;
+  }
+
+  public TestScanWithBloomError(StoreFile.BloomType bloomType) {
+    this.bloomType = bloomType;
+  }
+
+  @Before
+  public void setUp() throws IOException{
+    conf = TEST_UTIL.getConfiguration();
+    fs = FileSystem.get(conf);
+  }
+
+  @Test
+  public void testThreeStoreFiles() throws IOException {
+    region = createRegion(TABLE_NAME, Compression.Algorithm.GZ, bloomType);
+    createStoreFile(new int[] {1, 2, 6});
+    createStoreFile(new int[] {1, 2, 3, 7});
+    createStoreFile(new int[] {1, 9});
+    scanColSet(new int[]{1, 4, 6, 7}, new int[]{1, 6, 7});
+
+    region.close();
+  }
+
+  private void scanColSet(int[] colSet, int[] expectedResultCols)
+      throws IOException {
+    LOG.info("Scanning column set: " + Arrays.toString(colSet));
+    Scan scan = new Scan(ROW_BYTES, ROW_BYTES);
+    addColumnSetToScan(scan, colSet);
+    HRegion.RegionScanner scanner = (HRegion.RegionScanner)
+        region.getScanner(scan);
+    KeyValueHeap storeHeap = scanner.getStoreHeapForTesting();
+    assertEquals(0, storeHeap.getHeap().size());
+    StoreScanner storeScanner =
+        (StoreScanner) storeHeap.getCurrentForTesting();
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    List<StoreFileScanner> scanners = (List<StoreFileScanner>)
+        (List) storeScanner.getAllScannersForTesting();
+
+    // Sort scanners by their HFile's modification time.
+    Collections.sort(scanners, new Comparator<StoreFileScanner>() {
+      @Override
+      public int compare(StoreFileScanner s1, StoreFileScanner s2) {
+        Path p1 = s1.getReaderForTesting().getHFileReader().getPath();
+        Path p2 = s2.getReaderForTesting().getHFileReader().getPath();
+        long t1, t2;
+        try {
+          t1 = fs.getFileStatus(p1).getModificationTime();
+          t2 = fs.getFileStatus(p2).getModificationTime();
+        } catch (IOException ex) {
+          throw new RuntimeException(ex);
+        }
+        return t1 < t2 ? -1 : t1 == t2 ? 1 : 0;
+      }
+    });
+
+    StoreFile.Reader lastStoreFileReader = null;
+    for (StoreFileScanner sfScanner : scanners)
+      lastStoreFileReader = sfScanner.getReaderForTesting();
+
+    new HFilePrettyPrinter().run(new String[]{ "-m", "-p", "-f",
+        lastStoreFileReader.getHFileReader().getPath().toString()});
+
+    // Disable Bloom filter for the last store file. The disabled Bloom filter
+    // will always return "true".
+    LOG.info("Disabling Bloom filter for: "
+        + lastStoreFileReader.getHFileReader().getName());
+    lastStoreFileReader.disableBloomFilterForTesting();
+
+    List<KeyValue> allResults = new ArrayList<KeyValue>();
+
+    { // Limit the scope of results.
+      List<KeyValue> results = new ArrayList<KeyValue>();
+      while (scanner.next(results) || results.size() > 0) {
+        allResults.addAll(results);
+        results.clear();
+      }
+    }
+
+    List<Integer> actualIds = new ArrayList<Integer>();
+    for (KeyValue kv : allResults) {
+      String qual = Bytes.toString(kv.getQualifier());
+      assertTrue(qual.startsWith(QUALIFIER_PREFIX));
+      actualIds.add(Integer.valueOf(qual.substring(
+          QUALIFIER_PREFIX.length())));
+    }
+    List<Integer> expectedIds = new ArrayList<Integer>();
+    for (int expectedId : expectedResultCols)
+      expectedIds.add(expectedId);
+
+    LOG.info("Column ids returned: " + actualIds + ", expected: "
+        + expectedIds);
+    assertEquals(expectedIds.toString(), actualIds.toString());
+  }
+
+  private void addColumnSetToScan(Scan scan, int[] colIds) {
+    for (int colId : colIds)
+      scan.addColumn(FAMILY_BYTES, Bytes.toBytes(qualFromId(colId)));
+  }
+
+  private String qualFromId(int colId) {
+    return QUALIFIER_PREFIX + colId;
+  }
+
+  private void createStoreFile(int[] colIds)
+      throws IOException {
+    Put p = new Put(ROW_BYTES);
+    for (int colId : colIds) {
+      long ts = Long.MAX_VALUE;
+      String qual = qualFromId(colId);
+      allColIds.add(colId);
+      KeyValue kv = KeyValueTestUtil.create(ROW, FAMILY,
+          qual, ts, createValue(ROW, qual, ts));
+      p.add(kv);
+    }
+    region.put(p);
+    region.flushcache();
+  }
+
+}