You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ns...@apache.org on 2011/10/11 19:45:39 UTC

svn commit: r1181983 - in /hbase/branches/0.89/src: main/java/org/apache/hadoop/hbase/ main/java/org/apache/hadoop/hbase/regionserver/ test/java/org/apache/hadoop/hbase/regionserver/

Author: nspiegelberg
Date: Tue Oct 11 17:45:38 2011
New Revision: 1181983

URL: http://svn.apache.org/viewvc?rev=1181983&view=rev
Log:
Lazy-seek optimization for StoreFile scanners

Summary:
Previously, if we had several StoreFiles for a column family in a region, we
would seek in each of them and only then merge the results, even though the
row/column we are looking for might only be in the most recent (and the
smallest) file. Now we prioritize our reads from those files so that we check
the most recent file first. This is done by doing a "lazy seek" which pretends
that the next value in the StoreFile is (seekRow, seekColumn,
lastTimestampInStoreFile), which is earlier in the KV order than anything that
might actually occur in the file. So if we don't find the result in earlier
files, that fake KV will bubble up to the top of the KV heap and a real seek
will be done. This is expected to significantly reduce the amount of disk IO for
Prod Cluster (but we need careful testing and measurement).

This is joint work with Liyin Tang -- huge thanks to him for many helpful
discussions on this and the idea of putting fake KVs with the highest  timestamp
of the StoreFile in the scanner priority queue.

Test Plan:
Existing unit tests.
A new unit test will be written.
Test on dev cluster and dark launch.

Reviewers: liyintang, kannan, kranganathan

Reviewed By: liyintang

CC: hbase-eng@lists, hbase@lists, , liyintang, kannan

Differential Revision: 326359

Revert Plan: OK

Added:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
Removed:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/AbstractKeyValueScanner.java
Modified:
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
    hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
    hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1181983&r1=1181982&r2=1181983&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/KeyValue.java Tue Oct 11 17:45:38 2011
@@ -1740,6 +1740,21 @@ public class KeyValue implements Writabl
   }
 
   /**
+   * Creates the first KV with the row/family/qualifier of this KV and the
+   * given timestamp. Uses the "maximum" KV type that guarantees that the new
+   * KV is the lowest possible for this combination of row, family, qualifier,
+   * and timestamp. This KV's own timestamp is ignored. While this function
+   * copies the value from this KV, it is normally used on key-only KVs.
+   */
+  public KeyValue createFirstOnRowColTS(long ts) {
+    return new KeyValue(
+        bytes, getRowOffset(), getRowLength(),
+        bytes, getFamilyOffset(), getFamilyLength(),
+        bytes, getQualifierOffset(), getQualifierLength(),
+        ts, Type.Maximum, bytes, getValueOffset(), getValueLength());
+  }
+
+  /**
    * @param b
    * @return A KeyValue made of a byte array that holds the key-only part.
    * Needed to convert hfile index members to KeyValues.

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java?rev=1181983&r1=1181982&r2=1181983&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnCount.java Tue Oct 11 17:45:38 2011
@@ -106,16 +106,4 @@ public class ColumnCount {
     this.count = count;
   }
 
-
-  /**
-   * Check to see if needed to fetch more versions
-   * @param max
-   * @return true if more versions are needed, false otherwise
-   */
-  public boolean needMore(int max) {
-    if(this.count < max) {
-      return true;
-    }
-    return false;
-  }
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=1181983&r1=1181982&r2=1181983&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Tue Oct 11 17:45:38 2011
@@ -40,34 +40,23 @@ import java.util.PriorityQueue;
  * also implements InternalScanner.  WARNING: As is, if you try to use this
  * as an InternalScanner at the Store level, you will get runtime exceptions.
  */
-public class KeyValueHeap implements KeyValueScanner, InternalScanner {
+public class KeyValueHeap extends NonLazyKeyValueScanner
+    implements KeyValueScanner, InternalScanner {
   private PriorityQueue<KeyValueScanner> heap = null;
-  private KeyValueScanner current = null;
-  private KVScannerComparator comparator;
 
   /**
-   * A helper enum that knows how to call the correct seek function within a
-   * {@link KeyValueScanner}.
+   * The current sub-scanner, i.e. the one that contains the next key/value
+   * to return to the client. This scanner is NOT included in {@link #heap}
+   * (but we frequently add it back to the heap and pull the new winner out).
+   * We maintain an invariant that the current sub-scanner has already done
+   * a real seek, and that current.peek() is always a real key/value (or null)
+   * except for the fake last-key-on-row-column supplied by the multi-column
+   * Bloom filter optimization, which is OK to propagate to StoreScanner. In
+   * order to ensure that, always use {@link #pollRealKV()} to update current.
    */
-  public enum SeekType {
-    NORMAL {
-      @Override
-      public boolean seek(KeyValueScanner scanner, KeyValue kv,
-          boolean forward) throws IOException {
-        return forward ? scanner.reseek(kv) : scanner.seek(kv);
-      }
-    },
-    EXACT {
-      @Override
-      public boolean seek(KeyValueScanner scanner, KeyValue kv,
-          boolean forward) throws IOException {
-        return scanner.seekExactly(kv, forward);
-      }
-    };
+  private KeyValueScanner current = null;
 
-    public abstract boolean seek(KeyValueScanner scanner, KeyValue kv,
-        boolean forward) throws IOException;
-  }
+  private KVScannerComparator comparator;
 
   /**
    * Constructor.  This KeyValueHeap will handle closing of passed in
@@ -76,7 +65,7 @@ public class KeyValueHeap implements Key
    * @param comparator
    */
   public KeyValueHeap(List<? extends KeyValueScanner> scanners,
-      KVComparator comparator) {
+      KVComparator comparator) throws IOException {
     this.comparator = new KVScannerComparator(comparator);
     if (!scanners.isEmpty()) {
       this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
@@ -88,7 +77,7 @@ public class KeyValueHeap implements Key
           scanner.close();
         }
       }
-      this.current = heap.poll();
+      this.current = pollRealKV();
     }
   }
 
@@ -107,13 +96,13 @@ public class KeyValueHeap implements Key
     KeyValue kvNext = this.current.peek();
     if (kvNext == null) {
       this.current.close();
-      this.current = this.heap.poll();
+      this.current = pollRealKV();
     } else {
       KeyValueScanner topScanner = this.heap.peek();
       if (topScanner == null ||
           this.comparator.compare(kvNext, topScanner.peek()) >= 0) {
         this.heap.add(this.current);
-        this.current = this.heap.poll();
+        this.current = pollRealKV();
       }
     }
     return kvReturn;
@@ -149,7 +138,7 @@ public class KeyValueHeap implements Key
     } else {
       this.heap.add(this.current);
     }
-    this.current = this.heap.poll();
+    this.current = pollRealKV();
     return (this.current != null);
   }
 
@@ -230,13 +219,20 @@ public class KeyValueHeap implements Key
    * <p>
    * As individual scanners may run past their ends, those scanners are
    * automatically closed and removed from the heap.
+   * <p>
+   * This function (and {@link #reseek(KeyValue)}) does not do multi-column
+   * Bloom filter and lazy-seek optimizations. To enable those, call
+   * {@link #requestSeek(KeyValue, boolean, boolean)}.
    * @param seekKey KeyValue to seek at or after
    * @return true if KeyValues exist at or after specified key, false if not
    * @throws IOException
    */
   @Override
   public boolean seek(KeyValue seekKey) throws IOException {
-    return generalizedSeek(seekKey, SeekType.NORMAL, false);
+    return generalizedSeek(false,    // This is not a lazy seek
+                           seekKey,
+                           false,    // forward (false: this is not a reseek)
+                           false);   // Not using Bloom filters
   }
 
   /**
@@ -245,20 +241,36 @@ public class KeyValueHeap implements Key
    */
   @Override
   public boolean reseek(KeyValue seekKey) throws IOException {
-    return generalizedSeek(seekKey, SeekType.NORMAL, true);
+    return generalizedSeek(false,    // This is not a lazy seek
+                           seekKey,
+                           true,     // forward (true because this is reseek)
+                           false);   // Not using Bloom filters
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public boolean seekExactly(KeyValue seekKey, boolean forward)
-      throws IOException {
-    return generalizedSeek(seekKey, SeekType.EXACT, forward);
+  public boolean requestSeek(KeyValue key, boolean forward,
+      boolean useBloom) throws IOException {
+    return generalizedSeek(true, key, forward, useBloom);
   }
 
-  private boolean generalizedSeek(KeyValue seekKey, SeekType seekType,
-      boolean forward) throws IOException {
+  /**
+   * @param isLazy whether we are trying to seek to exactly the given row/col.
+   *          Enables Bloom filter and most-recent-file-first optimizations for
+   *          multi-column get/scan queries.
+   * @param seekKey key to seek to
+   * @param forward whether to seek forward (also known as reseek)
+   * @param useBloom whether to optimize seeks using Bloom filters
+   */
+  private boolean generalizedSeek(boolean isLazy, KeyValue seekKey,
+      boolean forward, boolean useBloom) throws IOException {
+    if (!isLazy && useBloom) {
+      throw new IllegalArgumentException("Multi-column Bloom filter " +
+          "optimization requires a lazy seek");
+    }
+
     if (current == null) {
       return false;
     }
@@ -269,12 +281,26 @@ public class KeyValueHeap implements Key
     while ((scanner = heap.poll()) != null) {
       KeyValue topKey = scanner.peek();
       if (comparator.getComparator().compare(seekKey, topKey) <= 0) {
-        // Top KeyValue is at-or-after Seek KeyValue
-        current = scanner;
-        return true;
+        // Top KeyValue is at-or-after Seek KeyValue. We only know that all
+        // scanners are at or after seekKey (because fake keys of
+        // "lazily-seeked" scanners are not greater than their real next keys),
+        // but we still need to enforce our invariant that the top scanner has
+        // done a real seek. This way StoreScanner and RegionScanner do not
+        // have to worry about fake keys.
+        heap.add(scanner);
+        current = pollRealKV();
+        return current != null;
       }
 
-      if (!seekType.seek(scanner, seekKey, forward)) {
+      boolean seekResult;
+      if (isLazy) {
+        seekResult = scanner.requestSeek(seekKey, forward, useBloom);
+      } else {
+        seekResult = NonLazyKeyValueScanner.doRealSeek(
+            scanner, seekKey, forward);
+      }
+
+      if (!seekResult) {
         scanner.close();
       } else {
         heap.add(scanner);
@@ -286,6 +312,61 @@ public class KeyValueHeap implements Key
   }
 
   /**
+   * Fetches the top sub-scanner from the priority queue, ensuring that a real
+   * seek has been done on it. Works by fetching the top sub-scanner, and if it
+   * has not done a real seek, making it do so (which will modify its top KV),
+   * putting it back, and repeating this until success. Relies on the fact that
+   * on a lazy seek we set the current key of a StoreFileScanner to a KV that
+   * is not greater than the real next KV to be read from that file, so the
+   * scanner that bubbles up to the top of the heap will have global next KV in
+   * this scanner heap if (1) it has done a real seek and (2) its KV is the top
+   * among all top KVs (some of which are fake) in the scanner heap.
+   */
+  private KeyValueScanner pollRealKV() throws IOException {
+    KeyValueScanner kvScanner = heap.poll();
+    if (kvScanner == null) {
+      return null;
+    }
+
+    while (kvScanner != null && !kvScanner.realSeekDone()) {
+      if (kvScanner.peek() != null) {
+        kvScanner.enforceSeek();
+        KeyValue curKV = kvScanner.peek();
+        if (curKV != null) {
+          KeyValueScanner nextEarliestScanner = heap.peek();
+          if (nextEarliestScanner == null) {
+            // The heap is empty. Return the only possible scanner.
+            return kvScanner;
+          }
+
+          // Compare the current scanner to the next scanner. We try to avoid
+          // putting the current one back into the heap if possible.
+          KeyValue nextKV = nextEarliestScanner.peek();
+          if (nextKV == null || comparator.compare(curKV, nextKV) <= 0) {
+            // We already have the scanner with the earliest KV, so return it.
+            return kvScanner;
+          }
+
+          // Otherwise, put the scanner back into the heap and let it compete
+          // against all other (both "real-seeked" and "lazy-seeked") scanners.
+          heap.add(kvScanner);
+        } else {
+          // Close the scanner because we did a real seek and found out there
+          // are no more KVs.
+          kvScanner.close();
+        }
+      } else {
+        // Close the scanner because it has already run out of KVs even before
+        // we had to do a real seek on it.
+        kvScanner.close();
+      }
+      kvScanner = heap.poll();
+    }
+
+    return kvScanner;
+  }
+
+  /**
    * @return the current Heap
    */
   public PriorityQueue<KeyValueScanner> getHeap() {

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java?rev=1181983&r1=1181982&r2=1181983&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueScanner.java Tue Oct 11 17:45:38 2011
@@ -57,16 +57,6 @@ public interface KeyValueScanner {
   public boolean reseek(KeyValue key) throws IOException;
 
   /**
-   * Similar to {@link #seek} (or {@link #reseek} if forward is true) but only
-   * does a seek operation after checking that it is really necessary for the
-   * row/column combination specified by the kv parameter. This function was
-   * added to avoid unnecessary disk seeks on multi-column get queries using
-   * Bloom filter checking. Should only be used for queries where the set of
-   * columns is specified exactly.
-   */
-  public boolean seekExactly(KeyValue kv, boolean forward) throws IOException;
-
-  /**
    * Get the sequence id associated with this KeyValueScanner. This is required
    * for comparing multiple files to find out which one has the latest data.
    * The default implementation for this would be to return 0. A file having
@@ -78,4 +68,37 @@ public interface KeyValueScanner {
    * Close the KeyValue scanner.
    */
   public void close();
+
+  // "Lazy scanner" optimizations
+
+  /**
+   * Similar to {@link #seek} (or {@link #reseek} if forward is true) but only
+   * does a seek operation after checking that it is really necessary for the
+   * row/column combination specified by the kv parameter. This function was
+   * added to avoid unnecessary disk seeks by checking row-column Bloom filters
+   * before a seek on multi-column get/scan queries, and to optimize by looking
+   * up more recent files first.
+   * @param forward do a forward-only "reseek" instead of a random-access seek
+   * @param useBloom whether to enable multi-column Bloom filter optimization
+   */
+  public boolean requestSeek(KeyValue kv, boolean forward, boolean useBloom)
+      throws IOException;
+
+  /**
+   * We optimize our store scanners by checking the most recent store file
+   * first, so we sometimes pretend we have done a seek but delay it until the
+   * store scanner bubbles up to the top of the key-value heap. This method is
+   * then used to ensure the top store file scanner is seeked.
+   */
+  public boolean realSeekDone();
+
+  /**
+   * Does the real seek operation in case it was skipped by
+   * {@link #seekToRowCol(KeyValue, boolean)}. Note that this function should
+   * be never called on scanners that always do real seek operations (i.e. most
+   * of the scanners). The easiest way to achieve this is to call
+   * {@link #realSeekDone()} first.
+   */
+  public void enforceSeek() throws IOException;
+
 }

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=1181983&r1=1181982&r2=1181983&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Tue Oct 11 17:45:38 2011
@@ -485,7 +485,7 @@ public class MemStore implements HeapSiz
    * map and snapshot.
    * This behaves as if it were a real scanner but does not maintain position.
    */
-  protected class MemStoreScanner extends AbstractKeyValueScanner {
+  protected class MemStoreScanner extends NonLazyKeyValueScanner {
     // Next row information for either kvset or snapshot
     private KeyValue kvsetNextRow = null;
     private KeyValue snapshotNextRow = null;

Added: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java?rev=1181983&view=auto
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java (added)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/NonLazyKeyValueScanner.java Tue Oct 11 17:45:38 2011
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.hbase.KeyValue;
+
+/**
+ * A "non-lazy" scanner which always does a real seek operation. Most scanners
+ * are inherited from this class.
+ */
+public abstract class NonLazyKeyValueScanner implements KeyValueScanner {
+
+  @Override
+  public boolean requestSeek(KeyValue kv, boolean forward, boolean useBloom)
+      throws IOException {
+    return doRealSeek(this, kv, forward);
+  }
+
+  @Override
+  public boolean realSeekDone() {
+    return true;
+  }
+
+  @Override
+  public void enforceSeek() throws IOException {
+    throw new NotImplementedException("enforceSeek must not be called on a " +
+        "non-lazy scanner");
+  }
+
+  public static boolean doRealSeek(KeyValueScanner scanner,
+      KeyValue kv, boolean forward) throws IOException {
+    return forward ? scanner.reseek(kv) : scanner.seek(kv);
+  }
+
+}

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1181983&r1=1181982&r2=1181983&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Tue Oct 11 17:45:38 2011
@@ -63,12 +63,6 @@ public class ScanQueryMatcher {
   protected byte [] row;
 
   /**
-   * True if we are only interested in the given exact set of columns. In that
-   * case we can use Bloom filters to avoid unnecessary disk seeks.
-   */
-  private boolean exactColumnQuery;
-
-  /**
    * Constructs a ScanQueryMatcher for a Scan.
    * @param scan
    * @param family
@@ -97,12 +91,6 @@ public class ScanQueryMatcher {
       // We can share the ExplicitColumnTracker, diff is we reset
       // between rows, not between storefiles.
       this.columns = new ExplicitColumnTracker(columns,maxVersions);
-
-      // Set the "exact column query" flag to enable row-column Bloom filter
-      // optimization. We avoid checking row-column Bloom filters twice for
-      // single-column get queries, because they are already being checked
-      // in StoreFile.shouldSeek.
-      exactColumnQuery = !(scan.isGetScan() && columns.size() == 1);
     }
   }
 
@@ -333,10 +321,6 @@ public class ScanQueryMatcher {
         null, 0, 0);
   }
 
-  public boolean isExactColumnQuery() {
-    return exactColumnQuery;
-  }
-
   /**
    * {@link #match} return codes.  These instruct the scanner moving through
    * memstores and StoreFiles what to do with the current KeyValue.

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1181983&r1=1181982&r2=1181983&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Oct 11 17:45:38 2011
@@ -1342,6 +1342,10 @@ public class StoreFile {
     void disableBloomFilterForTesting() {
       bloomFilter = null;
     }
+
+    public long getMaxTimestamp() {
+      return timeRangeTracker.maximumTimestamp;
+    }
   }
 
   /**

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=1181983&r1=1181982&r2=1181983&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Tue Oct 11 17:45:38 2011
@@ -46,6 +46,10 @@ class StoreFileScanner implements KeyVal
   private final HFileScanner hfs;
   private KeyValue cur = null;
 
+  private boolean realSeekDone;
+  private boolean delayedReseek;
+  private KeyValue delayedSeekKV;
+
   private static final AtomicLong seekCount = new AtomicLong();
 
   /**
@@ -114,12 +118,16 @@ class StoreFileScanner implements KeyVal
   public boolean seek(KeyValue key) throws IOException {
     seekCount.incrementAndGet();
     try {
-      if(!seekAtOrAfter(hfs, key)) {
-        close();
-        return false;
+      try {
+        if(!seekAtOrAfter(hfs, key)) {
+          close();
+          return false;
+        }
+        cur = hfs.getKeyValue();
+        return true;
+      } finally {
+        realSeekDone = true;
       }
-      cur = hfs.getKeyValue();
-      return true;
     } catch(IOException ioe) {
       throw new IOException("Could not seek " + this, ioe);
     }
@@ -128,12 +136,16 @@ class StoreFileScanner implements KeyVal
   public boolean reseek(KeyValue key) throws IOException {
     seekCount.incrementAndGet();
     try {
-      if (!reseekAtOrAfter(hfs, key)) {
-        close();
-        return false;
+      try {
+        if (!reseekAtOrAfter(hfs, key)) {
+          close();
+          return false;
+        }
+        cur = hfs.getKeyValue();
+        return true;
+      } finally {
+        realSeekDone = true;
       }
-      cur = hfs.getKeyValue();
-      return true;
     } catch (IOException ioe) {
       throw new IOException("Could not seek " + this, ioe);
     }
@@ -189,27 +201,72 @@ class StoreFileScanner implements KeyVal
     return reader.getSequenceID();
   }
 
+  /**
+   * Pretend we have done a seek but don't do it yet, if possible. The hope is
+   * that we find requested columns in more recent files and won't have to seek
+   * in older files. Creates a fake key/value with the given row/column and the
+   * highest (most recent) possible timestamp we might get from this file. When
+   * users of such "lazy scanner" need to know the next KV precisely (e.g. when
+   * this scanner is at the top of the heap), they run {@link #enforceSeek()}.
+   * <p>
+   * Note that this function does guarantee that the current KV of this scanner
+   * will be advanced to at least the given KV. Because of this, it does have
+   * to do a real seek in cases when the seek timestamp is older than the
+   * highest timestamp of the file, e.g. when we are trying to seek to the next
+   * row/column and use OLDEST_TIMESTAMP in the seek key.
+   */
   @Override
-  public boolean seekExactly(KeyValue kv, boolean forward)
+  public boolean requestSeek(KeyValue kv, boolean forward, boolean useBloom)
       throws IOException {
     if (reader.getBloomFilterType() != StoreFile.BloomType.ROWCOL ||
-        kv.getRowLength() == 0 || kv.getQualifierLength() == 0) {
-      return forward ? reseek(kv) : seek(kv);
+        kv.getFamilyLength() == 0) {
+      useBloom = false;
     }
 
-    boolean isInBloom = reader.passesBloomFilter(kv.getBuffer(),
-        kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
-        kv.getQualifierOffset(), kv.getQualifierLength());
-    if (isInBloom) {
-      // This row/column might be in this store file. Do a normal seek.
-      return forward ? reseek(kv) : seek(kv);
+    boolean haveToSeek = true;
+    if (useBloom) {
+      haveToSeek = reader.passesBloomFilter(kv.getBuffer(),
+          kv.getRowOffset(), kv.getRowLength(), kv.getBuffer(),
+          kv.getQualifierOffset(), kv.getQualifierLength());
+    }
+
+    delayedReseek = forward;
+    delayedSeekKV = kv;
+
+    if (haveToSeek) {
+      // This row/column might be in this store file (or we did not use the
+      // Bloom filter), so we still need to seek.
+      realSeekDone = false;
+      long maxTimestampInFile = reader.getMaxTimestamp();
+      long seekTimestamp = kv.getTimestamp();
+      if (seekTimestamp > maxTimestampInFile) {
+        // Create a fake key that is not greater than the real next key.
+        // (Lower timestamps correspond to higher KVs.)
+        // To understand this better, consider that we are asked to seek to
+        // a higher timestamp than the max timestamp in this file. We know that
+        // the next point when we have to consider this file again is when we
+        // pass the max timestamp of this file (with the same row/column).
+        cur = kv.createFirstOnRowColTS(maxTimestampInFile);
+      } else {
+        // This will be the case e.g. when we need to seek to the next
+        // row/column, and we don't know exactly what they are, so we set the
+        // seek key's timestamp to OLDEST_TIMESTAMP to skip the rest of this
+        // row/column.
+        enforceSeek();
+      }
+      return cur != null;
     }
 
+    // Multi-column Bloom filter optimization.
     // Create a fake key/value, so that this scanner only bubbles up to the top
     // of the KeyValueHeap in StoreScanner after we scanned this row/column in
     // all other store files. The query matcher will then just skip this fake
-    // key/value and the store scanner will progress to the next column.
+    // key/value and the store scanner will progress to the next column. This
+    // is obviously not a "real real" seek, but unlike the fake KV earlier in
+    // this method, we want this to be propagated to ScanQueryMatcher.
     cur = kv.createLastOnRowCol();
+
+    realSeekDone = true;
     return true;
   }
 
@@ -217,6 +274,23 @@ class StoreFileScanner implements KeyVal
     return reader;
   }
 
+  @Override
+  public boolean realSeekDone() {
+    return realSeekDone;
+  }
+
+  @Override
+  public void enforceSeek() throws IOException {
+    if (realSeekDone)
+      return;
+
+    if (delayedReseek) {
+      reseek(delayedSeekKV);
+    } else {
+      seek(delayedSeekKV);
+    }
+  }
+
   // Test methods
 
   static final long getSeekCount() {

Modified: hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1181983&r1=1181982&r2=1181983&view=diff
==============================================================================
--- hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/branches/0.89/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Tue Oct 11 17:45:38 2011
@@ -25,7 +25,6 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.NavigableSet;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.KeyValue;
@@ -36,7 +35,8 @@ import org.apache.hadoop.hbase.util.Byte
  * Scanner scans both the memstore and the HStore. Coalesce KeyValue stream
  * into List<KeyValue> for a single row.
  */
-class StoreScanner implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
+class StoreScanner extends NonLazyKeyValueScanner
+    implements KeyValueScanner, InternalScanner, ChangedReadersObserver {
   static final Log LOG = LogFactory.getLog(StoreScanner.class);
   private Store store;
   private ScanQueryMatcher matcher;
@@ -49,6 +49,8 @@ class StoreScanner implements KeyValueSc
   // Doesnt need to be volatile because it's always accessed via synchronized methods
   private boolean closing = false;
   private final boolean isGet;
+  private final boolean explicitColumnQuery;
+  private final boolean useRowColBloom;
 
   /** We don't ever expect to change this, the constant is just for clarity. */
   static final boolean LAZY_SEEK_ENABLED_BY_DEFAULT = true;
@@ -60,6 +62,23 @@ class StoreScanner implements KeyValueSc
   // if heap == null and lastTop != null, you need to reseek given the key below
   private KeyValue lastTop = null;
 
+  /** An internal constructor. */
+  private StoreScanner(Store store, boolean cacheBlocks, Scan scan,
+      final NavigableSet<byte[]> columns){
+    this.store = store;
+    initializeMetricNames();
+    this.cacheBlocks = cacheBlocks;
+    isGet = scan.isGetScan();
+    int numCol = columns == null ? 0 : columns.size();
+    explicitColumnQuery = numCol > 0;
+
+    // We look up row-column Bloom filters for multi-column queries as part of
+    // the seek operation. However, we also look the row-column Bloom filter
+    // for multi-row (non-"get") scans because this is not done in
+    // StoreFile.passesBloomFilter(Scan, SortedSet<byte[]>).
+    useRowColBloom = numCol > 1 || (!isGet && numCol == 1);
+  }
+
   /**
    * Opens a scanner across memstore, snapshot, and all StoreFiles.
    *
@@ -70,27 +89,25 @@ class StoreScanner implements KeyValueSc
    */
   StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
                               throws IOException {
-    this.store = store;
-    initializeMetricNames();
-
-    this.cacheBlocks = scan.getCacheBlocks();
+    this(store, scan.getCacheBlocks(), scan, columns);
     matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
         columns, store.ttl, store.comparator.getRawComparator(),
         store.versionsToReturn(scan.getMaxVersions()),
         false);
 
-    this.isGet = scan.isGetScan();
-    // pass columns = try to filter out unnecessary ScanFiles
+    // Pass columns to try to filter out unnecessary StoreFiles.
     List<KeyValueScanner> scanners = getScanners(scan, columns);
 
     // Seek all scanners to the start of the Row (or if the exact matching row
     // key does not exist, then to the start of the next matching Row).
-    if (matcher.isExactColumnQuery()) {
-      for (KeyValueScanner scanner : scanners)
-        scanner.seekExactly(matcher.getStartKey(), false);
+    if (explicitColumnQuery && lazySeekEnabledGlobally) {
+      for (KeyValueScanner scanner : scanners) {
+        scanner.requestSeek(matcher.getStartKey(), false, useRowColBloom);
+      }
     } else {
-      for (KeyValueScanner scanner : scanners)
+      for (KeyValueScanner scanner : scanners) {
         scanner.seek(matcher.getStartKey());
+      }
     }
 
     // set storeLimit
@@ -113,11 +130,8 @@ class StoreScanner implements KeyValueSc
   StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners,
                             boolean retainDeletesInOutput)
       throws IOException {
-    this.store = store;
-    this.initializeMetricNames();
+    this(store, false, scan, null);
 
-    this.cacheBlocks = false;
-    this.isGet = false;
     matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
         null, store.ttl, store.comparator.getRawComparator(),
         store.versionsToReturn(scan.getMaxVersions()),
@@ -138,11 +152,7 @@ class StoreScanner implements KeyValueSc
       final NavigableSet<byte[]> columns,
       final List<KeyValueScanner> scanners)
         throws IOException {
-    this.store = null;
-    this.initializeMetricNames();
-
-    this.isGet = false;
-    this.cacheBlocks = scan.getCacheBlocks();
+    this(null, scan.getCacheBlocks(), scan, columns);
     this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
         comparator.getRawComparator(), scan.getMaxVersions(),
         false);
@@ -264,7 +274,6 @@ class StoreScanner implements KeyValueSc
    */
   @Override
   public synchronized boolean next(List<KeyValue> outResult, int limit) throws IOException {
-    //DebugPrint.println("SS.next");
 
     checkReseek();
 
@@ -282,8 +291,8 @@ class StoreScanner implements KeyValueSc
     }
 
     if ((matcher.row == null) || !peeked.matchingRow(matcher.row)) {
-	this.countPerRow = 0;
-	matcher.setRow(peeked.getRow());
+      this.countPerRow = 0;
+      matcher.setRow(peeked.getRow());
     }
     KeyValue kv;
     List<KeyValue> results = new ArrayList<KeyValue>();
@@ -291,7 +300,6 @@ class StoreScanner implements KeyValueSc
       // kv is no longer immutable due to KeyOnlyFilter! use copy for safety
       KeyValue copyKv = new KeyValue(kv.getBuffer(), kv.getOffset(), kv.getLength());
       ScanQueryMatcher.MatchCode qcode = matcher.match(copyKv);
-      //DebugPrint.println("SS peek kv = " + kv + " with qcode = " + qcode);
       switch(qcode) {
         case INCLUDE:
         case INCLUDE_AND_SEEK_NEXT_ROW:
@@ -341,8 +349,8 @@ class StoreScanner implements KeyValueSc
           return false;
 
         case SEEK_NEXT_ROW:
-          // This is just a relatively simple end of scan fix, to short-cut end us if there is a
-          // endKey in the scan.
+          // This is just a relatively simple end of scan fix, to short-cut end
+          // us if there is an endKey in the scan.
           if (!matcher.moreRowsMayExistAfter(kv)) {
             outResult.addAll(results);
             return false;
@@ -456,8 +464,11 @@ class StoreScanner implements KeyValueSc
   public synchronized boolean reseek(KeyValue kv) throws IOException {
     //Heap cannot be null, because this is only called from next() which
     //guarantees that heap will never be null before this call.
-    return matcher.isExactColumnQuery() ? heap.seekExactly(kv, true) :
-        heap.reseek(kv);
+    if (explicitColumnQuery && lazySeekEnabledGlobally) {
+      return heap.requestSeek(kv, true, useRowColBloom);
+    } else {
+      return heap.reseek(kv);
+    }
   }
 
   @Override
@@ -465,11 +476,6 @@ class StoreScanner implements KeyValueSc
     return 0;
   }
 
-  @Override
-  public boolean seekExactly(KeyValue kv, boolean forward) throws IOException {
-    throw new NotImplementedException();
-  }
-
   /**
    * Used in testing.
    * @return all scanners in no particular order

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java?rev=1181983&r1=1181982&r2=1181983&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/KeyValueScanFixture.java Tue Oct 11 17:45:38 2011
@@ -34,7 +34,7 @@ import java.util.List;
  * to the provided comparator, and then the whole thing pretends
  * to be a store file scanner.
  */
-public class KeyValueScanFixture extends AbstractKeyValueScanner {
+public class KeyValueScanFixture extends NonLazyKeyValueScanner {
   ArrayList<KeyValue> data;
   Iterator<KeyValue> iter = null;
   KeyValue current = null;

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java?rev=1181983&r1=1181982&r2=1181983&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeyValueHeap.java Tue Oct 11 17:45:38 2011
@@ -208,7 +208,7 @@ public class TestKeyValueHeap extends HB
     }
   }
 
-  private static class Scanner extends AbstractKeyValueScanner {
+  private static class Scanner extends NonLazyKeyValueScanner {
     private Iterator<KeyValue> iter;
     private KeyValue current;
     private boolean closed = false;

Modified: hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=1181983&r1=1181982&r2=1181983&view=diff
==============================================================================
--- hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original)
+++ hbase/branches/0.89/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java Tue Oct 11 17:45:38 2011
@@ -529,6 +529,7 @@ public class TestMemStore extends TestCa
    * @throws InterruptedException
    */
   public void testGetNextRow() throws Exception {
+    ReadWriteConsistencyControl.resetThreadReadPoint();
     addRows(this.memstore);
     // Add more versions to make it a little more interesting.
     Thread.sleep(1);