You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ra...@apache.org on 2010/07/13 23:01:12 UTC

svn commit: r963862 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/test/java/org/apache/hadoop/hbase/client/ src/test/java/org/apache/hadoop/hbase/regionserver/

Author: rawson
Date: Tue Jul 13 21:01:11 2010
New Revision: 963862

URL: http://svn.apache.org/viewvc?rev=963862&view=rev
Log:
HBASE-2265  HFile and Memstore should maintain minimum and maximum timestamps

Added:
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=963862&r1=963861&r2=963862&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Tue Jul 13 21:01:11 2010
@@ -760,6 +760,8 @@ Release 0.21.0 - Unreleased
                in a single roundtrip (Kannan via Ryan)
    HBASE-2828  HTable unnecessarily coupled with HMaster
                (Nicolas Spiegelberg via Stack)
+   HBASE-2265  HFile and Memstore should maintain minimum and maximum timestamps
+   	       (Pranav via Ryan)
 
   NEW FEATURES
    HBASE-1961  HBase EC2 scripts

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=963862&r1=963861&r2=963862&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java Tue Jul 13 21:01:11 2010
@@ -925,6 +925,15 @@ public class KeyValue implements Writabl
   }
 
   /**
+   *
+   * @return True if this KV is a delete family or column type.
+   */
+  public boolean isDeleteColumnOrFamily() {
+    int t = getType();
+    return t == Type.DeleteColumn.getCode() || t == Type.DeleteFamily.getCode();
+  }
+
+  /**
    * Primarily for use client-side.  Returns the family of this KeyValue in a
    * new byte array.<p>
    *

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java?rev=963862&r1=963861&r2=963862&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/KeyValueHeap.java Tue Jul 13 21:01:11 2010
@@ -41,7 +41,7 @@ import java.util.PriorityQueue;
  * as an InternalScanner at the Store level, you will get runtime exceptions.
  */
 public class KeyValueHeap implements KeyValueScanner, InternalScanner {
-  private PriorityQueue<KeyValueScanner> heap;
+  private PriorityQueue<KeyValueScanner> heap = null;
   private KeyValueScanner current = null;
   private KVScannerComparator comparator;
 
@@ -51,22 +51,25 @@ public class KeyValueHeap implements Key
    * @param scanners
    * @param comparator
    */
-  public KeyValueHeap(List<? extends KeyValueScanner> scanners, KVComparator comparator) {
+  public KeyValueHeap(List<? extends KeyValueScanner> scanners,
+      KVComparator comparator) {
     this.comparator = new KVScannerComparator(comparator);
-    this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
-        this.comparator);
-    for (KeyValueScanner scanner : scanners) {
-      if (scanner.peek() != null) {
-        this.heap.add(scanner);
-      } else {
-        scanner.close();
+    if (!scanners.isEmpty()) {
+      this.heap = new PriorityQueue<KeyValueScanner>(scanners.size(),
+          this.comparator);
+      for (KeyValueScanner scanner : scanners) {
+        if (scanner.peek() != null) {
+          this.heap.add(scanner);
+        } else {
+          scanner.close();
+        }
       }
+      this.current = heap.poll();
     }
-    this.current = heap.poll();
   }
 
   public KeyValue peek() {
-    if(this.current == null) {
+    if (this.current == null) {
       return null;
     }
     return this.current.peek();
@@ -78,12 +81,12 @@ public class KeyValueHeap implements Key
     }
     KeyValue kvReturn = this.current.next();
     KeyValue kvNext = this.current.peek();
-    if(kvNext == null) {
+    if (kvNext == null) {
       this.current.close();
       this.current = this.heap.poll();
     } else {
       KeyValueScanner topScanner = this.heap.peek();
-      if(topScanner == null ||
+      if (topScanner == null ||
           this.comparator.compare(kvNext, topScanner.peek()) > 0) {
         this.heap.add(this.current);
         this.current = this.heap.poll();
@@ -104,6 +107,9 @@ public class KeyValueHeap implements Key
    * @return true if there are more keys, false if all scanners are done
    */
   public boolean next(List<KeyValue> result, int limit) throws IOException {
+    if (this.current == null) {
+      return false;
+    }
     InternalScanner currentAsInternal = (InternalScanner)this.current;
     currentAsInternal.next(result, limit);
     KeyValue pee = this.current.peek();
@@ -160,12 +166,14 @@ public class KeyValueHeap implements Key
   }
 
   public void close() {
-    if(this.current != null) {
+    if (this.current != null) {
       this.current.close();
     }
-    KeyValueScanner scanner;
-    while((scanner = this.heap.poll()) != null) {
-      scanner.close();
+    if (this.heap != null) {
+      KeyValueScanner scanner;
+      while ((scanner = this.heap.poll()) != null) {
+        scanner.close();
+      }
     }
   }
 
@@ -178,10 +186,10 @@ public class KeyValueHeap implements Key
    * automatically closed and removed from the heap.
    * @param seekKey KeyValue to seek at or after
    * @return true if KeyValues exist at or after specified key, false if not
-   * @throws IOException 
+   * @throws IOException
    */
   public boolean seek(KeyValue seekKey) throws IOException {
-    if(this.current == null) {
+    if (this.current == null) {
       return false;
     }
     this.heap.add(this.current);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java?rev=963862&r1=963861&r2=963862&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/MemStore.java Tue Jul 13 21:01:11 2010
@@ -39,6 +39,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HeapSize;
 import org.apache.hadoop.hbase.regionserver.DeleteCompare.DeleteCode;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -81,6 +82,9 @@ public class MemStore implements HeapSiz
   // Used to track own heapSize
   final AtomicLong size;
 
+  TimeRangeTracker timeRangeTracker;
+  TimeRangeTracker snapshotTimeRangeTracker;
+
   /**
    * Default constructor. Used for tests.
    */
@@ -99,6 +103,8 @@ public class MemStore implements HeapSiz
     this.comparatorIgnoreType = this.comparator.getComparatorIgnoringType();
     this.kvset = new KeyValueSkipListSet(c);
     this.snapshot = new KeyValueSkipListSet(c);
+    timeRangeTracker = new TimeRangeTracker();
+    snapshotTimeRangeTracker = new TimeRangeTracker();
     this.size = new AtomicLong(DEEP_OVERHEAD);
   }
 
@@ -128,6 +134,8 @@ public class MemStore implements HeapSiz
         if (!this.kvset.isEmpty()) {
           this.snapshot = this.kvset;
           this.kvset = new KeyValueSkipListSet(this.comparator);
+          this.snapshotTimeRangeTracker = this.timeRangeTracker;
+          this.timeRangeTracker = new TimeRangeTracker();
           // Reset heap to not include any keys
           this.size.set(DEEP_OVERHEAD);
         }
@@ -167,6 +175,7 @@ public class MemStore implements HeapSiz
       // create a new snapshot and let the old one go.
       if (!ss.isEmpty()) {
         this.snapshot = new KeyValueSkipListSet(this.comparator);
+        this.snapshotTimeRangeTracker = new TimeRangeTracker();
       }
     } finally {
       this.lock.writeLock().unlock();
@@ -183,6 +192,7 @@ public class MemStore implements HeapSiz
     this.lock.readLock().lock();
     try {
       s = heapSizeChange(kv, this.kvset.add(kv));
+      timeRangeTracker.includeTimestamp(kv);
       this.size.addAndGet(s);
     } finally {
       this.lock.readLock().unlock();
@@ -198,9 +208,9 @@ public class MemStore implements HeapSiz
   long delete(final KeyValue delete) {
     long s = 0;
     this.lock.readLock().lock();
-
     try {
       s += heapSizeChange(delete, this.kvset.add(delete));
+      timeRangeTracker.includeTimestamp(delete);
     } finally {
       this.lock.readLock().unlock();
     }
@@ -487,6 +497,19 @@ public class MemStore implements HeapSiz
     return false;
   }
 
+  /**
+   * Check if this memstore may contain the required keys
+   * @param scan
+   * @return False if the key definitely does not exist in this Memstore
+   */
+  public boolean shouldSeek(Scan scan) {
+    return timeRangeTracker.includesTimeRange(scan.getTimeRange()) ||
+        snapshotTimeRangeTracker.includesTimeRange(scan.getTimeRange());
+  }
+
+  public TimeRangeTracker getSnapshotTimeRangeTracker() {
+    return this.snapshotTimeRangeTracker;
+  }
 
   /*
    * MemStoreScanner implements the KeyValueScanner.
@@ -520,7 +543,7 @@ public class MemStore implements HeapSiz
       StoreScanner level with coordination with MemStoreScanner.
 
     */
-    
+
     MemStoreScanner() {
       super();
 
@@ -531,7 +554,7 @@ public class MemStore implements HeapSiz
       KeyValue ret = null;
       long readPoint = ReadWriteConsistencyControl.getThreadReadPoint();
       //DebugPrint.println( " MS@" + hashCode() + ": threadpoint = " + readPoint);
-      
+
       while (ret == null && it.hasNext()) {
         KeyValue v = it.next();
         if (v.getMemstoreTS() <= readPoint) {
@@ -566,7 +589,7 @@ public class MemStore implements HeapSiz
       //DebugPrint.println( " MS@" + hashCode() + " snapshot seek: " + snapshotNextRow + " with size = " +
       //    snapshot.size() + " threadread = " + readPoint);
 
-      
+
       KeyValue lowest = getLowest();
 
       // has data := (lowest != null)
@@ -631,7 +654,7 @@ public class MemStore implements HeapSiz
 
   public final static long FIXED_OVERHEAD = ClassSize.align(
       ClassSize.OBJECT + (7 * ClassSize.REFERENCE));
-  
+
   public final static long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
       ClassSize.REENTRANT_LOCK + ClassSize.ATOMIC_LONG +
       ClassSize.COPYONWRITE_ARRAYSET + ClassSize.COPYONWRITE_ARRAYLIST +

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=963862&r1=963861&r2=963862&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Tue Jul 13 21:01:11 2010
@@ -212,7 +212,7 @@ public class Store implements HeapSize {
     return new Path(tabledir, new Path(encodedName,
       new Path(Bytes.toString(family))));
   }
-  
+
   /**
    * Return the directory in which this store stores its
    * StoreFiles
@@ -417,15 +417,17 @@ public class Store implements HeapSize {
    * Write out current snapshot.  Presumes {@link #snapshot()} has been called
    * previously.
    * @param logCacheFlushId flush sequence number
+   * @param snapshot
    * @return true if a compaction is needed
    * @throws IOException
    */
   private StoreFile flushCache(final long logCacheFlushId,
-                               SortedSet<KeyValue> snapshot) throws IOException {
+      SortedSet<KeyValue> snapshot,
+      TimeRangeTracker snapshotTimeRangeTracker) throws IOException {
     // If an exception happens flushing, we let it out without clearing
     // the memstore snapshot.  The old snapshot will be returned when we say
     // 'snapshot', the next time flush comes around.
-    return internalFlushCache(snapshot, logCacheFlushId);
+    return internalFlushCache(snapshot, logCacheFlushId, snapshotTimeRangeTracker);
   }
 
   /*
@@ -435,7 +437,8 @@ public class Store implements HeapSize {
    * @throws IOException
    */
   private StoreFile internalFlushCache(final SortedSet<KeyValue> set,
-                                       final long logCacheFlushId)
+      final long logCacheFlushId,
+      TimeRangeTracker snapshotTimeRangeTracker)
       throws IOException {
     StoreFile.Writer writer = null;
     long flushed = 0;
@@ -450,6 +453,7 @@ public class Store implements HeapSize {
     synchronized (flushLock) {
       // A. Write the map out to the disk
       writer = createWriterInTmp(set.size());
+      writer.setTimeRangeTracker(snapshotTimeRangeTracker);
       int entries = 0;
       try {
         for (KeyValue kv: set) {
@@ -466,12 +470,12 @@ public class Store implements HeapSize {
         writer.close();
       }
     }
-    
+
     // Write-out finished successfully, move into the right spot
     Path dstPath = StoreFile.getUniqueFile(fs, homedir);
     LOG.info("Renaming flushed file at " + writer.getPath() + " to " + dstPath);
     fs.rename(writer.getPath(), dstPath);
-    
+
     StoreFile sf = new StoreFile(this.fs, dstPath, blockcache,
       this.conf, this.family.getBloomFilterType(), this.inMemory);
     StoreFile.Reader r = sf.createReader();
@@ -1436,6 +1440,7 @@ public class Store implements HeapSize {
     private long cacheFlushId;
     private SortedSet<KeyValue> snapshot;
     private StoreFile storeFile;
+    private TimeRangeTracker snapshotTimeRangeTracker;
 
     private StoreFlusherImpl(long cacheFlushId) {
       this.cacheFlushId = cacheFlushId;
@@ -1445,11 +1450,12 @@ public class Store implements HeapSize {
     public void prepare() {
       memstore.snapshot();
       this.snapshot = memstore.getSnapshot();
+      this.snapshotTimeRangeTracker = memstore.getSnapshotTimeRangeTracker();
     }
 
     @Override
     public void flushCache() throws IOException {
-      storeFile = Store.this.flushCache(cacheFlushId, snapshot);
+      storeFile = Store.this.flushCache(cacheFlushId, snapshot, snapshotTimeRangeTracker);
     }
 
     @Override

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=963862&r1=963861&r2=963862&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Tue Jul 13 21:01:11 2010
@@ -24,9 +24,9 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HalfStoreFileReader;
 import org.apache.hadoop.hbase.io.Reference;
 import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -38,7 +38,9 @@ import org.apache.hadoop.hbase.util.Bloo
 import org.apache.hadoop.hbase.util.ByteBloomFilter;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Hash;
+import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.io.RawComparator;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Function;
@@ -105,6 +107,8 @@ public class StoreFile {
   public static final byte [] MAJOR_COMPACTION_KEY = Bytes.toBytes("MAJOR_COMPACTION_KEY");
   /** Bloom filter Type in FileInfo */
   static final byte[] BLOOM_FILTER_TYPE_KEY = Bytes.toBytes("BLOOM_FILTER_TYPE");
+  /** Key for Timerange information in metadata*/
+  static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
 
   /** Meta data block name for bloom filter meta-info (ie: bloom params/specs) */
   static final String BLOOM_FILTER_META_KEY = "BLOOM_FILTER_META";
@@ -411,6 +415,17 @@ public class StoreFile {
       this.reader.loadBloomfilter();
     }
 
+    try {
+      byte [] timerangeBytes = metadataMap.get(TIMERANGE_KEY);
+      if (b != null) {
+        this.reader.timeRangeTracker = new TimeRangeTracker();
+        Writables.copyWritable(timerangeBytes, this.reader.timeRangeTracker);
+      }
+    } catch (IllegalArgumentException e) {
+      LOG.error("Error reading timestamp range data from meta -- " +
+          "proceeding without", e);
+      this.reader.timeRangeTracker = null;
+    }
     return this.reader;
   }
 
@@ -647,6 +662,14 @@ public class StoreFile {
     private KVComparator kvComparator;
     private KeyValue lastKv = null;
     private byte[] lastByteArray = null;
+    TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
+    /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
+     * When flushing a memstore, we set TimeRange and use this variable to
+     * indicate that it doesn't need to be calculated again while
+     * appending KeyValues.
+     * It is not set in cases of compactions when it is recalculated using only
+     * the appended KeyValues*/
+    boolean isTimeRangeTrackerSet = false;
 
     protected HFile.Writer writer;
     /**
@@ -693,7 +716,49 @@ public class StoreFile {
     public void appendMetadata(final long maxSequenceId, final boolean majorCompaction)
     throws IOException {
       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
-      writer.appendFileInfo(MAJOR_COMPACTION_KEY, Bytes.toBytes(majorCompaction));
+      writer.appendFileInfo(MAJOR_COMPACTION_KEY,
+          Bytes.toBytes(majorCompaction));
+      appendTimeRangeMetadata();
+    }
+
+    /**
+     * Add TimestampRange to Metadata
+     */
+    public void appendTimeRangeMetadata() throws IOException {
+      appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
+    }
+
+    /**
+     * Set TimeRangeTracker
+     * @param trt
+     */
+    public void setTimeRangeTracker(final TimeRangeTracker trt) {
+      this.timeRangeTracker = trt;
+      isTimeRangeTrackerSet = true;
+    }
+
+    /**
+     * If the timeRangeTracker is not set,
+     * update TimeRangeTracker to include the timestamp of this key
+     * @param kv
+     * @throws IOException
+     */
+    public void includeInTimeRangeTracker(final KeyValue kv) {
+      if (!isTimeRangeTrackerSet) {
+        timeRangeTracker.includeTimestamp(kv);
+      }
+    }
+
+    /**
+     * If the timeRangeTracker is not set,
+     * update TimeRangeTracker to include the timestamp of this key
+     * @param key
+     * @throws IOException
+     */
+    public void includeInTimeRangeTracker(final byte [] key) {
+      if (!isTimeRangeTrackerSet) {
+        timeRangeTracker.includeTimestamp(key);
+      }
     }
 
     public void append(final KeyValue kv) throws IOException {
@@ -744,6 +809,7 @@ public class StoreFile {
         }
       }
       writer.append(kv);
+      includeInTimeRangeTracker(kv);
     }
 
     public Path getPath() {
@@ -759,6 +825,7 @@ public class StoreFile {
         }
       }
       writer.append(key, value);
+      includeInTimeRangeTracker(key);
     }
 
     public void close() throws IOException {
@@ -794,6 +861,7 @@ public class StoreFile {
     protected BloomFilter bloomFilter = null;
     protected BloomType bloomFilterType;
     private final HFile.Reader reader;
+    protected TimeRangeTracker timeRangeTracker = null;
 
     public Reader(FileSystem fs, Path path, BlockCache blockCache, boolean inMemory)
         throws IOException {
@@ -834,13 +902,28 @@ public class StoreFile {
       reader.close();
     }
 
-    public boolean shouldSeek(final byte[] row,
-                              final SortedSet<byte[]> columns) {
+    public boolean shouldSeek(Scan scan, final SortedSet<byte[]> columns) {
+        return (passesTimerangeFilter(scan) && passesBloomFilter(scan,columns));
+    }
 
-      if (this.bloomFilter == null) {
+    /**
+     * Check if this storeFile may contain keys within the TimeRange
+     * @param scan
+     * @return False if it definitely does not exist in this StoreFile
+     */
+    private boolean passesTimerangeFilter(Scan scan) {
+      if (timeRangeTracker == null) {
         return true;
+      } else {
+        return timeRangeTracker.includesTimeRange(scan.getTimeRange());
       }
+    }
 
+    private boolean passesBloomFilter(Scan scan, final SortedSet<byte[]> columns) {
+      if (this.bloomFilter == null || !scan.isGetScan()) {
+        return true;
+      }
+      byte[] row = scan.getStartRow();
       byte[] key;
       switch (this.bloomFilterType) {
         case ROW:

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java?rev=963862&r1=963861&r2=963862&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFileScanner.java Tue Jul 13 21:01:11 2010
@@ -23,6 +23,7 @@ package org.apache.hadoop.hbase.regionse
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 
 import java.io.IOException;
@@ -131,9 +132,8 @@ class StoreFileScanner implements KeyVal
     return true;
   }
 
-  // Bloom filter hook.
-  public boolean shouldSeek(final byte[] row,
-                            final SortedSet<byte[]> columns) {
-    return reader.shouldSeek(row, columns);
+  // StoreFile filter hook.
+  public boolean shouldSeek(Scan scan, final SortedSet<byte[]> columns) {
+    return reader.shouldSeek(scan, columns);
   }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=963862&r1=963861&r2=963862&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Tue Jul 13 21:01:11 2010
@@ -154,17 +154,17 @@ class StoreScanner implements KeyValueSc
     List<KeyValueScanner> scanners =
       new ArrayList<KeyValueScanner>(sfScanners.size()+1);
 
-    // exclude scan files that have failed file filters
+    // include only those scan files which pass all filters
     for (StoreFileScanner sfs : sfScanners) {
-      if (isGet &&
-          !sfs.shouldSeek(scan.getStartRow(), columns)) {
-        continue; // exclude this hfs
+      if (sfs.shouldSeek(scan, columns)) {
+        scanners.add(sfs);
       }
-      scanners.add(sfs);
     }
 
     // Then the memstore scanners
-    scanners.addAll(this.store.memstore.getScanners());
+    if (this.store.memstore.shouldSeek(scan)) {
+      scanners.addAll(this.store.memstore.getScanners());
+    }
     return scanners;
   }
 

Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java?rev=963862&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/TimeRangeTracker.java Tue Jul 13 21:01:11 2010
@@ -0,0 +1,147 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.Type;
+import org.apache.hadoop.hbase.io.TimeRange;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Stores the minimum and maximum timestamp values.
+ * Can be used to find if any given time range overlaps with its time range
+ * MemStores use this class to track its minimum and maximum timestamps.
+ * When writing StoreFiles, this information is stored in meta blocks and used
+ * at read time to match against the required TimeRange
+ */
+public class TimeRangeTracker implements Writable {
+
+  long minimumTimestamp = -1;
+  long maximumTimestamp = -1;
+
+  /**
+   * Default constructor.
+   * Initializes TimeRange to be null
+   */
+  public TimeRangeTracker() {
+
+  }
+
+  /**
+   * Copy Constructor
+   * @param trt source TimeRangeTracker
+   */
+  public TimeRangeTracker(final TimeRangeTracker trt) {
+    this.minimumTimestamp = trt.getMinimumTimestamp();
+    this.maximumTimestamp = trt.getMaximumTimestamp();
+  }
+
+  public TimeRangeTracker(long minimumTimestamp, long maximumTimestamp) {
+    this.minimumTimestamp = minimumTimestamp;
+    this.maximumTimestamp = maximumTimestamp;
+  }
+
+  /**
+   * Update the current TimestampRange to include the timestamp from KeyValue
+   * If the Key is of type DeleteColumn or DeleteFamily, it includes the
+   * entire time range from 0 to timestamp of the key.
+   * @param kv the KeyValue to include
+   */
+  public void includeTimestamp(final KeyValue kv) {
+    includeTimestamp(kv.getTimestamp());
+    if (kv.isDeleteColumnOrFamily()) {
+      includeTimestamp(0);
+    }
+  }
+
+  /**
+   * Update the current TimestampRange to include the timestamp from Key.
+   * If the Key is of type DeleteColumn or DeleteFamily, it includes the
+   * entire time range from 0 to timestamp of the key.
+   * @param key
+   */
+  public void includeTimestamp(final byte[] key) {
+    includeTimestamp(Bytes.toLong(key,key.length-KeyValue.TIMESTAMP_TYPE_SIZE));
+    int type = key[key.length - 1];
+    if (type == Type.DeleteColumn.getCode() ||
+        type == Type.DeleteFamily.getCode()) {
+      includeTimestamp(0);
+    }
+  }
+
+  /**
+   * If required, update the current TimestampRange to include timestamp
+   * @param timestamp the timestamp value to include
+   */
+  private void includeTimestamp(final long timestamp) {
+    if (maximumTimestamp==-1) {
+      minimumTimestamp = timestamp;
+      maximumTimestamp = timestamp;
+    }
+    else if (minimumTimestamp > timestamp) {
+      minimumTimestamp = timestamp;
+    }
+    else if (maximumTimestamp < timestamp) {
+      maximumTimestamp = timestamp;
+    }
+    return;
+  }
+
+  /**
+   * Check if the range has any overlap with TimeRange
+   * @param tr TimeRange
+   * @return True if there is overlap, false otherwise
+   */
+  public boolean includesTimeRange(final TimeRange tr) {
+    return (this.minimumTimestamp < tr.getMax() &&
+        this.maximumTimestamp >= tr.getMin());
+  }
+
+  /**
+   * @return the minimumTimestamp
+   */
+  public long getMinimumTimestamp() {
+    return minimumTimestamp;
+  }
+
+  /**
+   * @return the maximumTimestamp
+   */
+  public long getMaximumTimestamp() {
+    return maximumTimestamp;
+  }
+
+  public void write(final DataOutput out) throws IOException {
+    out.writeLong(minimumTimestamp);
+    out.writeLong(maximumTimestamp);
+  }
+
+  public void readFields(final DataInput in) throws IOException {
+    this.minimumTimestamp = in.readLong();
+    this.maximumTimestamp = in.readLong();
+  }
+
+}
+

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java?rev=963862&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestMultipleTimestamps.java Tue Jul 13 21:01:11 2010
@@ -0,0 +1,304 @@
+/**
+ * Copyright 2009 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Run tests related to {@link TimestampsFilter} using HBase client APIs.
+ * Sets up the HBase mini cluster once at start. Each creates a table
+ * named for the method and does its stuff against that.
+ */
+public class TestMultipleTimestamps {
+  final Log LOG = LogFactory.getLog(getClass());
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(3);
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    // Nothing to do.
+  }
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @After
+  public void tearDown() throws Exception {
+    // Nothing to do.
+  }
+
+  @Test
+  public void testWithVersionDeletes() throws Exception {
+
+    // first test from memstore (without flushing).
+    testWithVersionDeletes(false);
+
+    // run same test against HFiles (by forcing a flush).
+    testWithVersionDeletes(true);
+  }
+
+  public void testWithVersionDeletes(boolean flushTables) throws IOException {
+    byte [] TABLE = Bytes.toBytes("testWithVersionDeletes_" +
+        (flushTables ? "flush" : "noflush"));
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    // For row:0, col:0: insert versions 1 through 5.
+    putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+    if (flushTables) {
+      flush();
+    }
+
+    // delete version 4.
+    deleteOneVersion(ht, FAMILY, 0, 0, 4);
+
+    // request a bunch of versions including the deleted version. We should
+    // only get back entries for the versions that exist.
+    KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L, 4L, 5L));
+    assertEquals(3, kvs.length);
+    checkOneCell(kvs[0], FAMILY, 0, 0, 5);
+    checkOneCell(kvs[1], FAMILY, 0, 0, 3);
+    checkOneCell(kvs[2], FAMILY, 0, 0, 2);
+  }
+
+  @Test
+  public void testWithMultipleVersionDeletes() throws IOException {
+    byte [] TABLE = Bytes.toBytes("testWithMultipleVersionDeletes");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    // For row:0, col:0: insert versions 1 through 5.
+    putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+    flush();
+
+    // delete all versions before 4.
+    deleteAllVersionsBefore(ht, FAMILY, 0, 0, 4);
+
+    // request a bunch of versions including the deleted version. We should
+    // only get back entries for the versions that exist.
+    KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
+    assertEquals(0, kvs.length);
+  }
+
+  @Test
+  public void testWithColumnDeletes() throws IOException {
+    byte [] TABLE = Bytes.toBytes("testWithColumnDeletes");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    // For row:0, col:0: insert versions 1 through 5.
+    putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+    flush();
+
+    // delete all versions before 4.
+    deleteColumn(ht, FAMILY, 0, 0);
+
+    // request a bunch of versions including the deleted version. We should
+    // only get back entries for the versions that exist.
+    KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
+    assertEquals(0, kvs.length);
+  }
+
+  @Test
+  public void testWithFamilyDeletes() throws IOException {
+    byte [] TABLE = Bytes.toBytes("testWithFamilyDeletes");
+    byte [] FAMILY = Bytes.toBytes("event_log");
+    byte [][] FAMILIES = new byte[][] { FAMILY };
+
+    // create table; set versions to max...
+    HTable ht = TEST_UTIL.createTable(TABLE, FAMILIES, Integer.MAX_VALUE);
+
+    // For row:0, col:0: insert versions 1 through 5.
+    putNVersions(ht, FAMILY, 0, 0, 1, 5);
+
+    flush();
+
+    // delete all versions before 4.
+    deleteFamily(ht, FAMILY, 0);
+
+    // request a bunch of versions including the deleted version. We should
+    // only get back entries for the versions that exist.
+    KeyValue kvs[] = getNVersions(ht, FAMILY, 0, 0, Arrays.asList(2L, 3L));
+    assertEquals(0, kvs.length);
+  }
+
+  // Flush tables. Since flushing is asynchronous, sleep for a bit.
+  private void flush() throws IOException {
+    TEST_UTIL.flush();
+    try {
+      Thread.sleep(3000);
+    } catch (InterruptedException i) {
+      // ignore
+    }
+  }
+
+  /**
+   * Assert that the passed in KeyValue has expected contents for the
+   * specified row, column & timestamp.
+   */
+  private void checkOneCell(KeyValue kv, byte[] cf,
+      int rowIdx, int colIdx, long ts) {
+
+    String ctx = "rowIdx=" + rowIdx + "; colIdx=" + colIdx + "; ts=" + ts;
+
+    assertEquals("Row mismatch which checking: " + ctx,
+        "row:"+ rowIdx, Bytes.toString(kv.getRow()));
+
+    assertEquals("ColumnFamily mismatch while checking: " + ctx,
+        Bytes.toString(cf), Bytes.toString(kv.getFamily()));
+
+    assertEquals("Column qualifier mismatch while checking: " + ctx,
+        "column:" + colIdx,
+        Bytes.toString(kv.getQualifier()));
+
+    assertEquals("Timestamp mismatch while checking: " + ctx,
+        ts, kv.getTimestamp());
+
+    assertEquals("Value mismatch while checking: " + ctx,
+        "value-version-" + ts, Bytes.toString(kv.getValue()));
+  }
+
+  /**
+   * Uses the TimestampFilter on a Get to request a specified list of
+   * versions for the row/column specified by rowIdx & colIdx.
+   *
+   */
+  private  KeyValue[] getNVersions(HTable ht, byte[] cf, int rowIdx,
+      int colIdx, List<Long> versions)
+  throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Get get = new Get(row);
+    get.addColumn(cf, column);
+    get.setMaxVersions();
+    get.setTimeRange(Collections.min(versions), Collections.max(versions)+1);
+    Result result = ht.get(get);
+
+    return result.raw();
+  }
+
+  /**
+   * Insert in specific row/column versions with timestamps
+   * versionStart..versionEnd.
+   */
+  private void putNVersions(HTable ht, byte[] cf, int rowIdx, int colIdx,
+      long versionStart, long versionEnd)
+  throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Put put = new Put(row);
+
+    for (long idx = versionStart; idx <= versionEnd; idx++) {
+      put.add(cf, column, idx, Bytes.toBytes("value-version-" + idx));
+    }
+
+    ht.put(put);
+  }
+
+  /**
+   * For row/column specified by rowIdx/colIdx, delete the cell
+   * corresponding to the specified version.
+   */
+  private void deleteOneVersion(HTable ht, byte[] cf, int rowIdx,
+      int colIdx, long version)
+  throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Delete del = new Delete(row);
+    del.deleteColumn(cf, column, version);
+    ht.delete(del);
+  }
+
+  /**
+   * For row/column specified by rowIdx/colIdx, delete all cells
+   * preceeding the specified version.
+   */
+  private void deleteAllVersionsBefore(HTable ht, byte[] cf, int rowIdx,
+      int colIdx, long version)
+  throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Delete del = new Delete(row);
+    del.deleteColumns(cf, column, version);
+    ht.delete(del);
+  }
+
+  private void deleteColumn(HTable ht, byte[] cf, int rowIdx, int colIdx) throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    byte column[] = Bytes.toBytes("column:" + colIdx);
+    Delete del = new Delete(row);
+    del.deleteColumns(cf, column);
+    ht.delete(del);
+  }
+
+  private void deleteFamily(HTable ht, byte[] cf, int rowIdx) throws IOException {
+    byte row[] = Bytes.toBytes("row:" + rowIdx);
+    Delete del = new Delete(row);
+    del.deleteFamily(cf);
+    ht.delete(del);
+  }
+}
+

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=963862&r1=963861&r2=963862&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java Tue Jul 13 21:01:11 2010
@@ -163,7 +163,7 @@ public class TestMemStore extends TestCa
 
   /**
    * A simple test which verifies the 3 possible states when scanning across snapshot.
-   * @throws IOException 
+   * @throws IOException
    */
   public void testScanAcrossSnapshot2() throws IOException {
     // we are going to the scanning across snapshot with two kvs
@@ -210,7 +210,7 @@ public class TestMemStore extends TestCa
       throws IOException {
     scanner.seek(KeyValue.createFirstOnRow(new byte[]{}));
     List<KeyValue> returned = Lists.newArrayList();
-    
+
     while (true) {
       KeyValue next = scanner.next();
       if (next == null) break;
@@ -313,15 +313,15 @@ public class TestMemStore extends TestCa
 
     // COMPLETE INSERT 2
     rwcc.completeMemstoreInsert(w);
-    
+
     // NOW SHOULD SEE NEW KVS IN ADDITION TO OLD KVS.
     // See HBASE-1485 for discussion about what we should do with
     // the duplicate-TS inserts
     ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
     s = this.memstore.getScanners().get(0);
-    assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12});    
+    assertScannerResults(s, new KeyValue[]{kv21, kv11, kv22, kv12});
   }
-  
+
   /**
    * When we insert a higher-memstoreTS deletion of a cell but with
    * the same timestamp, we still need to provide consistent reads
@@ -369,9 +369,9 @@ public class TestMemStore extends TestCa
     // NOW WE SHOULD SEE DELETE
     ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
     s = this.memstore.getScanners().get(0);
-    assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12});    
+    assertScannerResults(s, new KeyValue[]{kv11, kvDel, kv12});
   }
-  
+
 
   private static class ReadOwnWritesTester extends Thread {
     static final int NUM_TRIES = 1000;
@@ -454,7 +454,7 @@ public class TestMemStore extends TestCa
     }
   }
 
-  /** 
+  /**
    * Test memstore snapshots
    * @throws IOException
    */
@@ -785,7 +785,7 @@ public class TestMemStore extends TestCa
     expected.add(put2);
     expected.add(put1);
 
-    
+
     assertEquals(4, memstore.kvset.size());
     int i = 0;
     for (KeyValue kv: memstore.kvset) {
@@ -825,7 +825,7 @@ public class TestMemStore extends TestCa
     expected.add(put3);
 
 
-    
+
     assertEquals(5, memstore.kvset.size());
     int i = 0;
     for (KeyValue kv: memstore.kvset) {
@@ -884,9 +884,42 @@ public class TestMemStore extends TestCa
   }
 
 
+  ////////////////////////////////////
+  //Test for timestamps
+  ////////////////////////////////////
+
+  /**
+   * Test to ensure correctness when using Memstore with multiple timestamps
+   */
+  public void testMultipleTimestamps() throws IOException {
+    long[] timestamps = new long[] {20,10,5,1};
+    Scan scan = new Scan();
+
+    for (long timestamp: timestamps)
+      addRows(memstore,timestamp);
+
+    scan.setTimeRange(0, 2);
+    assertTrue(memstore.shouldSeek(scan));
+
+    scan.setTimeRange(20, 82);
+    assertTrue(memstore.shouldSeek(scan));
+
+    scan.setTimeRange(10, 20);
+    assertTrue(memstore.shouldSeek(scan));
+
+    scan.setTimeRange(8, 12);
+    assertTrue(memstore.shouldSeek(scan));
+
+    /*This test is not required for correctness but it should pass when
+     * timestamp range optimization is on*/
+    //scan.setTimeRange(28, 42);
+    //assertTrue(!memstore.shouldSeek(scan));
+  }
+
+
   //////////////////////////////////////////////////////////////////////////////
   // Helpers
-  //////////////////////////////////////////////////////////////////////////////  
+  //////////////////////////////////////////////////////////////////////////////
   private static byte [] makeQualifier(final int i1, final int i2){
     return Bytes.toBytes(Integer.toString(i1) + ";" +
         Integer.toString(i2));
@@ -1008,5 +1041,5 @@ public class TestMemStore extends TestCa
 
   }
 
-  
+
 }

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java?rev=963862&r1=963861&r2=963862&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java Tue Jul 13 21:01:11 2010
@@ -49,10 +49,12 @@ import org.apache.hadoop.hbase.HConstant
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.security.UnixUserGroupInformation;
 
 import com.google.common.base.Joiner;
@@ -420,4 +422,88 @@ public class TestStore extends TestCase 
     storeFlusher.flushCache();
     storeFlusher.commit();
   }
+
+
+
+  /**
+   * Generate a list of KeyValues for testing based on given parameters
+   * @param timestamps
+   * @param numRows
+   * @param qualifier
+   * @param family
+   * @return
+   */
+  List<KeyValue> getKeyValueSet(long[] timestamps, int numRows,
+      byte[] qualifier, byte[] family) {
+    List<KeyValue> kvList = new ArrayList<KeyValue>();
+    for (int i=1;i<=numRows;i++) {
+      byte[] b = Bytes.toBytes(i);
+      for (long timestamp: timestamps) {
+        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
+      }
+    }
+    return kvList;
+  }
+
+  /**
+   * Test to ensure correctness when using Stores with multiple timestamps
+   * @throws IOException
+   */
+  public void testMultipleTimestamps() throws IOException {
+    int numRows = 1;
+    long[] timestamps1 = new long[] {1,5,10,20};
+    long[] timestamps2 = new long[] {30,80};
+
+    init(this.getName());
+
+    List<KeyValue> kvList1 = getKeyValueSet(timestamps1,numRows, qf1, family);
+    for (KeyValue kv : kvList1) {
+      this.store.add(kv);
+    }
+
+    this.store.snapshot();
+    flushStore(store, id++);
+
+    List<KeyValue> kvList2 = getKeyValueSet(timestamps2,numRows, qf1, family);
+    for(KeyValue kv : kvList2) {
+      this.store.add(kv);
+    }
+
+    NavigableSet<byte[]> columns = new ConcurrentSkipListSet<byte[]>(
+        Bytes.BYTES_COMPARATOR);
+    columns.add(qf1);
+    List<KeyValue> result;
+    Get get = new Get(Bytes.toBytes(1));
+    get.addColumn(family,qf1);
+
+    get.setTimeRange(0,15);
+    result = new ArrayList<KeyValue>();
+    this.store.get(get, columns, result);
+    assertTrue(result.size()>0);
+
+    get.setTimeRange(40,90);
+    result = new ArrayList<KeyValue>();
+    this.store.get(get, columns, result);
+    assertTrue(result.size()>0);
+
+    get.setTimeRange(10,45);
+    result = new ArrayList<KeyValue>();
+    this.store.get(get, columns, result);
+    assertTrue(result.size()>0);
+
+    get.setTimeRange(80,145);
+    result = new ArrayList<KeyValue>();
+    this.store.get(get, columns, result);
+    assertTrue(result.size()>0);
+
+    get.setTimeRange(1,2);
+    result = new ArrayList<KeyValue>();
+    this.store.get(get, columns, result);
+    assertTrue(result.size()>0);
+
+    get.setTimeRange(90,200);
+    result = new ArrayList<KeyValue>();
+    this.store.get(get, columns, result);
+    assertTrue(result.size()==0);
+  }
 }
\ No newline at end of file

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java?rev=963862&r1=963861&r2=963862&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreFile.java Tue Jul 13 21:01:11 2010
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.List;
 import java.util.TreeSet;
 
 import org.apache.commons.logging.Log;
@@ -35,6 +36,7 @@ import org.apache.hadoop.hbase.HBaseTest
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.Reference.Range;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
@@ -354,7 +356,9 @@ public class TestStoreFile extends HBase
       TreeSet<byte[]> columns = new TreeSet<byte[]>();
       columns.add("family:col".getBytes());
 
-      boolean exists = scanner.shouldSeek(row.getBytes(), columns);
+      Scan scan = new Scan(row.getBytes(),row.getBytes());
+      scan.addColumn("family".getBytes(), "family:col".getBytes());
+      boolean exists = scanner.shouldSeek(scan, columns);
       if (i % 2 == 0) {
         if (!exists) falseNeg++;
       } else {
@@ -428,7 +432,9 @@ public class TestStoreFile extends HBase
           TreeSet<byte[]> columns = new TreeSet<byte[]>();
           columns.add(("col" + col).getBytes());
 
-          boolean exists = scanner.shouldSeek(row.getBytes(), columns);
+          Scan scan = new Scan(row.getBytes(),row.getBytes());
+          scan.addColumn("family".getBytes(), ("col"+col).getBytes());
+          boolean exists = scanner.shouldSeek(scan, columns);
           boolean shouldRowExist = i % 2 == 0;
           boolean shouldColExist = j % 2 == 0;
           shouldColExist = shouldColExist || bt[x] == StoreFile.BloomType.ROW;
@@ -496,5 +502,77 @@ public class TestStoreFile extends HBase
     Mockito.doReturn(name).when(mock).toString();
     return mock;
   }
-  
+
+  /**
+   *Generate a list of KeyValues for testing based on given parameters
+   * @param timestamps
+   * @param numRows
+   * @param qualifier
+   * @param family
+   * @return
+   */
+  List<KeyValue> getKeyValueSet(long[] timestamps, int numRows,
+      byte[] qualifier, byte[] family) {
+    List<KeyValue> kvList = new ArrayList<KeyValue>();
+    for (int i=1;i<=numRows;i++) {
+      byte[] b = Bytes.toBytes(i) ;
+      LOG.info(Bytes.toString(b));
+      LOG.info(Bytes.toString(b));
+      for (long timestamp: timestamps)
+      {
+        kvList.add(new KeyValue(b, family, qualifier, timestamp, b));
+      }
+    }
+    return kvList;
+  }
+
+  /**
+   * Test to ensure correctness when using StoreFile with multiple timestamps
+   * @throws IOException
+   */
+  public void testMultipleTimestamps() throws IOException {
+    byte[] family = Bytes.toBytes("familyname");
+    byte[] qualifier = Bytes.toBytes("qualifier");
+    int numRows = 10;
+    long[] timestamps = new long[] {20,10,5,1};
+    Scan scan = new Scan();
+
+    Path storedir = new Path(new Path(this.testDir, "regionname"),
+    "familyname");
+    Path dir = new Path(storedir, "1234567890");
+    StoreFile.Writer writer = StoreFile.createWriter(this.fs, dir, 8 * 1024);
+
+    List<KeyValue> kvList = getKeyValueSet(timestamps,numRows,
+        family, qualifier);
+
+    for (KeyValue kv : kvList) {
+      writer.append(kv);
+    }
+    writer.appendMetadata(0, false);
+    writer.close();
+
+    StoreFile hsf = new StoreFile(this.fs, writer.getPath(), true, conf,
+        StoreFile.BloomType.NONE, false);
+    StoreFile.Reader reader = hsf.createReader();
+    StoreFileScanner scanner = reader.getStoreFileScanner(false, false);
+    TreeSet<byte[]> columns = new TreeSet<byte[]>();
+    columns.add(qualifier);
+
+    scan.setTimeRange(20, 100);
+    assertTrue(scanner.shouldSeek(scan, columns));
+
+    scan.setTimeRange(1, 2);
+    assertTrue(scanner.shouldSeek(scan, columns));
+
+    scan.setTimeRange(8, 10);
+    assertTrue(scanner.shouldSeek(scan, columns));
+
+    scan.setTimeRange(7, 50);
+    assertTrue(scanner.shouldSeek(scan, columns));
+
+    /*This test is not required for correctness but it should pass when
+     * timestamp range optimization is on*/
+    //scan.setTimeRange(27, 50);
+    //assertTrue(!scanner.shouldSeek(scan, columns));
+  }
 }