You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2011/10/21 21:57:08 UTC

svn commit: r1187531 [1/2] - in /hbase/trunk: ./ src/docbkx/ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/client/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/ruby/hbase/ src/test/java/org/apache/hadoop/h...

Author: larsh
Date: Fri Oct 21 19:57:07 2011
New Revision: 1187531

URL: http://svn.apache.org/viewvc?rev=1187531&view=rev
Log:
HBASE-4536  Allow CF to retain deleted rows

Added:
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/docbkx/book.xml
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Attributes.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/trunk/src/main/ruby/hbase/admin.rb
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Oct 21 19:57:07 2011
@@ -2,6 +2,7 @@ HBase Change Log
 Release 0.93.0 - Unreleased
   NEW FEATURE
    HBASE-4460  Support running an embedded ThriftServer within a RegionServer (jgray)
+   HBASE-4536  Allow CF to retain deleted rows (Lars H)
 
   IMPROVEMENT
    HBASE-4132  Extend the WALActionsListener API to accomodate log archival

Modified: hbase/trunk/src/docbkx/book.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/src/docbkx/book.xml?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/docbkx/book.xml (original)
+++ hbase/trunk/src/docbkx/book.xml Fri Oct 21 19:57:07 2011
@@ -670,14 +670,14 @@ admin.enableTable(table);               
     <title>
     Minimum Number of Versions
     </title>
-    <para>Like number of max row versions, the minimum number of row versions to keep is configured per column
+    <para>Like maximum number of row versions, the minimum number of row versions to keep is configured per column
       family via <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/HColumnDescriptor.html">HColumnDescriptor</link>.
       The default for min versions is 0, which means the feature is disabled.
       The minimum number of row versions parameter is used together with the time-to-live parameter and can be combined with the
       number of row versions parameter to allow configurations such as
       "keep the last T minutes worth of data, at most N versions, <emphasis>but keep at least M versions around</emphasis>"
-      (where M is the value for minimum number of row versions, M&lt;=N).
-      This parameter should only be set when time-to-live is enabled for a column family and must be less or equal to the 
+      (where M is the value for minimum number of row versions, M&lt;N).
+      This parameter should only be set when time-to-live is enabled for a column family and must be less than the
       number of row versions.
     </para>
     </section>
@@ -723,6 +723,23 @@ admin.enableTable(table);               
   <para>See <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/HColumnDescriptor.html">HColumnDescriptor</link> for more information.
   </para>
   </section>
+  <section xml:id="cf.keep.deleted">
+  <title>
+  Keeping Deleted Cells
+  </title>
+  <para>ColumnFamilies can optionally keep deleted cells. That means deleted cells can still be retrieved with
+  <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Get.html">Get</link> or
+  <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Scan.html">Scan</link> operations,
+  as long these operations have a time range specified that ends before the timestamp of any delete that would affect the cells.
+  This allows for point in time queries even in the presence of deletes.
+  </para>
+  <para>
+  Deleted cells are still subject to TTL and there will never be more than "maximum number of versions" deleted cells.
+  A new "raw" scan options returns all deleted rows and the delete markers.
+  </para>
+  <para>See <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/HColumnDescriptor.html">HColumnDescriptor</link> for more information.
+  </para>
+  </section>
   <section xml:id="schema.bloom">
   <title>Bloom Filters</title>
   <para>Bloom Filters can be enabled per-ColumnFamily.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java Fri Oct 21 19:57:07 2011
@@ -88,6 +88,7 @@ public class HColumnDescriptor implement
   public static final String FOREVER = "FOREVER";
   public static final String REPLICATION_SCOPE = "REPLICATION_SCOPE";
   public static final String MIN_VERSIONS = "MIN_VERSIONS";
+  public static final String KEEP_DELETED_CELLS = "KEEP_DELETED_CELLS";
 
   /**
    * Default compression type.
@@ -117,6 +118,11 @@ public class HColumnDescriptor implement
   public static final boolean DEFAULT_IN_MEMORY = false;
 
   /**
+   * Default setting for preventing deleted from being collected immediately.
+   */
+  public static final boolean DEFAULT_KEEP_DELETED = false;
+
+  /**
    * Default setting for whether to use a block cache or not.
    */
   public static final boolean DEFAULT_BLOCKCACHE = true;
@@ -151,6 +157,7 @@ public class HColumnDescriptor implement
       DEFAULT_VALUES.put(BLOCKSIZE, String.valueOf(DEFAULT_BLOCKSIZE));
       DEFAULT_VALUES.put(HConstants.IN_MEMORY, String.valueOf(DEFAULT_IN_MEMORY));
       DEFAULT_VALUES.put(BLOCKCACHE, String.valueOf(DEFAULT_BLOCKCACHE));
+      DEFAULT_VALUES.put(KEEP_DELETED_CELLS, String.valueOf(DEFAULT_KEEP_DELETED));
   }
 
   // Column family name
@@ -265,8 +272,9 @@ public class HColumnDescriptor implement
       final String compression, final boolean inMemory,
       final boolean blockCacheEnabled, final int blocksize,
       final int timeToLive, final String bloomFilter, final int scope) {
-    this(familyName, DEFAULT_MIN_VERSIONS, maxVersions, compression, inMemory,
-        blockCacheEnabled, blocksize, timeToLive, bloomFilter, scope);
+    this(familyName, DEFAULT_MIN_VERSIONS, maxVersions, DEFAULT_KEEP_DELETED,
+        compression, inMemory, blockCacheEnabled, blocksize, timeToLive,
+        bloomFilter, scope);
   }
 
   /**
@@ -275,6 +283,8 @@ public class HColumnDescriptor implement
    * letter -- and may not contain a <code>:<code>
    * @param minVersions Minimum number of versions to keep
    * @param maxVersions Maximum number of versions to keep
+   * @param keepDeletedCells Whether to retain deleted cells until they expire
+   *        up to maxVersions versions.
    * @param compression Compression type
    * @param inMemory If true, column data should be kept in an HRegionServer's
    * cache
@@ -292,8 +302,9 @@ public class HColumnDescriptor implement
    * a <code>:</code>
    * @throws IllegalArgumentException if the number of versions is &lt;= 0
    */
-  public HColumnDescriptor(final byte [] familyName, final int minVersions,
-      final int maxVersions, final String compression, final boolean inMemory,
+  public HColumnDescriptor(final byte[] familyName, final int minVersions,
+      final int maxVersions, final boolean keepDeletedCells,
+      final String compression, final boolean inMemory,
       final boolean blockCacheEnabled, final int blocksize,
       final int timeToLive, final String bloomFilter, final int scope) {
     isLegalFamilyName(familyName);
@@ -309,14 +320,15 @@ public class HColumnDescriptor implement
       if (timeToLive == HConstants.FOREVER) {
         throw new IllegalArgumentException("Minimum versions requires TTL.");
       }
-      if (minVersions > maxVersions) {
-        throw new IllegalArgumentException("Minimum versions must be <= "+
-            "maximum versions.");
+      if (minVersions >= maxVersions) {
+        throw new IllegalArgumentException("Minimum versions must be < "
+            + "maximum versions.");
       }
     }
 
     setMaxVersions(maxVersions);
     setMinVersions(minVersions);
+    setKeepDeletedCells(keepDeletedCells);
     setInMemory(inMemory);
     setBlockCacheEnabled(blockCacheEnabled);
     setTimeToLive(timeToLive);
@@ -542,6 +554,22 @@ public class HColumnDescriptor implement
     setValue(HConstants.IN_MEMORY, Boolean.toString(inMemory));
   }
 
+  public boolean getKeepDeletedCells() {
+    String value = getValue(KEEP_DELETED_CELLS);
+    if (value != null) {
+      return Boolean.valueOf(value).booleanValue();
+    }
+    return DEFAULT_KEEP_DELETED;
+  }
+
+  /**
+   * @param keepDeletedRows True if deleted rows should not be collected
+   * immediately.
+   */
+  public void setKeepDeletedCells(boolean keepDeletedCells) {
+    setValue(KEEP_DELETED_CELLS, Boolean.toString(keepDeletedCells));
+  }
+
   /**
    * @return Time-to-live of cell contents, in seconds.
    */

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java Fri Oct 21 19:57:07 2011
@@ -207,6 +207,14 @@ public class KeyValue implements Writabl
   // the row cached
   private volatile byte [] rowCache = null;
 
+  /**
+   * @return True if a delete type, a {@link KeyValue.Type#Delete} or
+   * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
+   * KeyValue type.
+   */
+  public static boolean isDelete(byte t) {
+    return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
+  }
 
   /** Here be dragons **/
 
@@ -1038,8 +1046,7 @@ public class KeyValue implements Writabl
    * KeyValue type.
    */
   public boolean isDelete() {
-    int t = getType();
-    return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
+    return KeyValue.isDelete(getType());
   }
 
   /**

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Attributes.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Attributes.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Attributes.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Attributes.java Fri Oct 21 19:57:07 2011
@@ -26,6 +26,7 @@ public interface Attributes {
   /**
    * Sets an attribute.
    * In case value = null attribute is removed from the attributes map.
+   * Attribute names starting with _ indicate system attributes.
    * @param name attribute name
    * @param value attribute value
    */

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java Fri Oct 21 19:57:07 2011
@@ -82,12 +82,13 @@ import java.util.TreeSet;
  * execute {@link #setCacheBlocks(boolean)}.
  */
 public class Scan extends OperationWithAttributes implements Writable {
+  private static final String RAW_ATTR = "_raw_";
+
   private static final byte SCAN_VERSION = (byte)2;
   private byte [] startRow = HConstants.EMPTY_START_ROW;
   private byte [] stopRow  = HConstants.EMPTY_END_ROW;
   private int maxVersions = 1;
   private int batch = -1;
-
   // If application wants to collect scan metrics, it needs to
   // call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE))
   static public String SCAN_ATTRIBUTES_METRICS_ENABLE =
@@ -695,4 +696,26 @@ public class Scan extends OperationWithA
     }
     return cols.toString();
   }
+
+  /**
+   * Enable/disable "raw" mode for this scan.
+   * If "raw" is enabled the scan will return all
+   * delete marker and deleted rows that have not
+   * been collected, yet.
+   * This is mostly useful for Scan on column families
+   * that have KEEP_DELETED_ROWS enabled.
+   * It is an error to specify any column when "raw" is set.
+   * @param raw True/False to enable/disable "raw" mode.
+   */
+  public void setRaw(boolean raw) {
+    setAttribute(RAW_ATTR, Bytes.toBytes(raw));
+  }
+
+  /**
+   * @return True if this Scan is in "raw" mode.
+   */
+  public boolean isRaw() {
+    byte[] attr = getAttribute(RAW_ATTR);
+    return attr == null ? false : Bytes.toBoolean(attr);
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java Fri Oct 21 19:57:07 2011
@@ -47,12 +47,14 @@ public interface ColumnTracker {
    * @param offset
    * @param length
    * @param ttl The timeToLive to enforce.
+   * @param type The type of the KeyValue
    * @return The match code instance.
    * @throws IOException in case there is an internal consistency problem
    *      caused by a data corruption.
    */
-  public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
-      int length, long ttl) throws IOException;
+  public ScanQueryMatcher.MatchCode checkColumn(byte[] bytes, int offset,
+      int length, long ttl, byte type)
+      throws IOException;
 
   /**
    * Updates internal variables in between files

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java Fri Oct 21 19:57:07 2011
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.NavigableSet;
 
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -97,16 +98,14 @@ public class ExplicitColumnTracker imple
   }
 
   /**
-   * Checks against the parameters of the query and the columns which have
-   * already been processed by this query.
-   * @param bytes KeyValue buffer
-   * @param offset offset to the start of the qualifier
-   * @param length length of the qualifier
-   * @param timestamp timestamp of the key being checked
-   * @return MatchCode telling ScanQueryMatcher what action to take
+   * {@inheritDoc}
    */
+  @Override
   public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
-      int length, long timestamp) {
+      int length, long timestamp, byte type) {
+    // delete markers should never be passed to an
+    // *Explicit*ColumnTracker
+    assert !KeyValue.isDelete(type);
     do {
       // No more columns left, we are done with this query
       if(this.columns.size() == 0) {
@@ -143,12 +142,12 @@ public class ExplicitColumnTracker imple
           if (this.columns.size() == this.index) {
             // We have served all the requested columns.
             this.column = null;
-	    return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
+            return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
           } else {
-	    // We are done with current column; advance to next column
-	    // of interest.
+            // We are done with current column; advance to next column
+            // of interest.
             this.column = this.columns.get(this.index);
-	    return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
+            return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
           }
         } else {
           setTS(timestamp);

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Fri Oct 21 19:57:07 2011
@@ -20,6 +20,9 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
+import java.io.IOException;
+import java.util.NavigableSet;
+
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
@@ -29,8 +32,7 @@ import org.apache.hadoop.hbase.io.TimeRa
 import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult;
 import org.apache.hadoop.hbase.util.Bytes;
 
-import java.io.IOException;
-import java.util.NavigableSet;
+import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
 
 /**
  * A query matcher that is specifically designed for the scan case.
@@ -39,72 +41,104 @@ public class ScanQueryMatcher {
   // Optimization so we can skip lots of compares when we decide to skip
   // to the next row.
   private boolean stickyNextRow;
-  private byte[] stopRow;
+  private final byte[] stopRow;
 
-  protected TimeRange tr;
+  private final TimeRange tr;
 
-  protected Filter filter;
+  private final Filter filter;
 
   /** Keeps track of deletes */
-  protected DeleteTracker deletes;
-  protected boolean retainDeletesInOutput;
+  private final DeleteTracker deletes;
+
+  /*
+   * The following three booleans define how we deal with deletes.
+   * There are three different aspects:
+   * 1. Whether to keep delete markers. This is used in compactions.
+   *    Minor compactions always keep delete markers.
+   * 2. Whether to keep deleted rows. This is also used in compactions,
+   *    if the store is set to keep deleted rows. This implies keeping
+   *    the delete markers as well.
+   *    In this case deleted rows are subject to the normal max version
+   *    and TTL/min version rules just like "normal" rows.
+   * 3. Whether a scan can do time travel queries even before deleted
+   *    marker to reach deleted rows.
+   */
+  /** whether to retain delete markers */
+  private final boolean retainDeletesInOutput;
+  /** whether to return deleted rows */
+  private final boolean keepDeletedCells;
+  /** whether time range queries can see rows "behind" a delete */
+  private final boolean seePastDeleteMarkers;
+
 
   /** Keeps track of columns and versions */
-  protected ColumnTracker columns;
+  private final ColumnTracker columns;
 
   /** Key to seek to in memstore and StoreFiles */
-  protected KeyValue startKey;
+  private final KeyValue startKey;
 
   /** Row comparator for the region this query is for */
-  KeyValue.KeyComparator rowComparator;
+  private final KeyValue.KeyComparator rowComparator;
 
+  /* row is not private for tests */
   /** Row the query is on */
-  protected byte [] row;
+  byte [] row;
+  
+  /**
+   * Oldest put in any of the involved store files
+   * Used to decide whether it is ok to delete
+   * family delete marker of this store keeps
+   * deleted KVs.
+   */
+  private final long earliestPutTs;
 
   /**
-   * Constructs a ScanQueryMatcher for a Scan.
+   * Construct a QueryMatcher for a scan
    * @param scan
-   * @param family
+   * @param scanInfo The store's immutable scan info
    * @param columns
-   * @param ttl
-   * @param rowComparator
+   * @param scanType Type of the scan
+   * @param earliestPutTs Earliest put seen in any of the store files.
    */
-  public ScanQueryMatcher(Scan scan, byte [] family,
-      NavigableSet<byte[]> columns, long ttl,
-      KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions,
-      boolean retainDeletesInOutput) {
+  public ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo,
+      NavigableSet<byte[]> columns, StoreScanner.ScanType scanType,
+      long earliestPutTs) {
     this.tr = scan.getTimeRange();
-    this.rowComparator = rowComparator;
+    this.rowComparator = scanInfo.getComparator().getRawComparator();
     this.deletes =  new ScanDeleteTracker();
     this.stopRow = scan.getStopRow();
-    this.startKey = KeyValue.createFirstOnRow(scan.getStartRow(), family, null);
+    this.startKey = KeyValue.createFirstOnRow(scan.getStartRow(), scanInfo.getFamily(), null);
     this.filter = scan.getFilter();
-    this.retainDeletesInOutput = retainDeletesInOutput;
+    this.earliestPutTs = earliestPutTs;
+
+    /* how to deal with deletes */
+    // keep deleted cells: if compaction or raw scan
+    this.keepDeletedCells = (scanInfo.getKeepDeletedCells() && scanType != ScanType.USER_SCAN) || scan.isRaw();
+    // retain deletes: if minor compaction or raw scan
+    this.retainDeletesInOutput = scanType == ScanType.MINOR_COMPACT || scan.isRaw();
+    // seePastDeleteMarker: user initiated scans
+    this.seePastDeleteMarkers = scanInfo.getKeepDeletedCells() && scanType == ScanType.USER_SCAN;
 
+    int maxVersions = Math.min(scan.getMaxVersions(), scanInfo.getMaxVersions());
     // Single branch to deal with two types of reads (columns vs all in family)
     if (columns == null || columns.size() == 0) {
       // use a specialized scan for wildcard column tracker.
-      this.columns = new ScanWildcardColumnTracker(minVersions, maxVersions, ttl);
+      this.columns = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersions, scanInfo.getTtl());
     } else {
       // We can share the ExplicitColumnTracker, diff is we reset
       // between rows, not between storefiles.
-      this.columns = new ExplicitColumnTracker(columns, minVersions, maxVersions,
-          ttl);
+      this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions,
+          scanInfo.getTtl());
     }
   }
 
-  public ScanQueryMatcher(Scan scan, byte [] family,
-      NavigableSet<byte[]> columns, long ttl,
-      KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions) {
-      /* By default we will not include deletes */
-      /* deletes are included explicitly (for minor compaction) */
-      this(scan, family, columns, ttl, rowComparator, minVersions, maxVersions,
-          false);
-  }
-  public ScanQueryMatcher(Scan scan, byte [] family,
-      NavigableSet<byte[]> columns, long ttl,
-      KeyValue.KeyComparator rowComparator, int maxVersions) {
-    this(scan, family, columns, ttl, rowComparator, 0, maxVersions);
+  /*
+   * Constructor for tests
+   */
+  ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo,
+      NavigableSet<byte[]> columns) {
+    this(scan, scanInfo, columns, StoreScanner.ScanType.USER_SCAN,
+        HConstants.LATEST_TIMESTAMP);
   }
 
   /**
@@ -171,21 +205,50 @@ public class ScanQueryMatcher {
         return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
     }
 
+    /*
+     * The delete logic is pretty complicated now.
+     * This is corroborated by the following:
+     * 1. The store might be instructed to keep deleted rows around.
+     * 2. A scan can optionally see past a delete marker now.
+     * 3. If deleted rows are kept, we have to find out when we can
+     *    remove the delete markers.
+     * 4. Family delete markers are always first (regardless of their TS)
+     * 5. Delete markers should not be counted as version
+     * 6. Delete markers affect puts of the *same* TS
+     * 7. Delete marker need to be version counted together with puts
+     *    they affect
+     */
     byte type = kv.getType();
-    if (isDelete(type)) {
-      if (tr.withinOrAfterTimeRange(timestamp)) {
-        this.deletes.add(bytes, offset, qualLength, timestamp, type);
+    if (kv.isDelete()) {
+      if (!keepDeletedCells) {
+        // first ignore delete markers if the scanner can do so, and the
+        // range does not include the marker
+        boolean includeDeleteMarker = seePastDeleteMarkers ?
+            // +1, to allow a range between a delete and put of same TS
+            tr.withinTimeRange(timestamp+1) :
+            tr.withinOrAfterTimeRange(timestamp);
+        if (includeDeleteMarker) {
+          this.deletes.add(bytes, offset, qualLength, timestamp, type);
+        }
         // Can't early out now, because DelFam come before any other keys
       }
       if (retainDeletesInOutput) {
+        // always include
         return MatchCode.INCLUDE;
-      }
-      else {
+      } else if (keepDeletedCells) {
+        if (timestamp < earliestPutTs) {
+          // keeping delete rows, but there are no puts older than
+          // this delete in the store files.
+          return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
+        }
+        // else: fall through and do version counting on the
+        // delete markers
+      } else {
         return MatchCode.SKIP;
       }
-    }
-
-    if (!this.deletes.isEmpty()) {
+      // note the following next else if...
+      // delete marker are not subject to other delete markers
+    } else if (!this.deletes.isEmpty()) {
       DeleteResult deleteResult = deletes.isDeleted(bytes, offset, qualLength,
           timestamp);
       switch (deleteResult) {
@@ -228,7 +291,8 @@ public class ScanQueryMatcher {
       }
     }
 
-    MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, timestamp);
+    MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength,
+        timestamp, type);
     /*
      * According to current implementation, colChecker can only be
      * SEEK_NEXT_COL, SEEK_NEXT_ROW, SKIP or INCLUDE. Therefore, always return
@@ -269,11 +333,6 @@ public class ScanQueryMatcher {
     stickyNextRow = false;
   }
 
-  // should be in KeyValue.
-  protected boolean isDelete(byte type) {
-    return (type != KeyValue.Type.Put.getCode());
-  }
-
   /**
    *
    * @return the start key

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java Fri Oct 21 19:57:07 2011
@@ -22,9 +22,8 @@ package org.apache.hadoop.hbase.regionse
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -32,17 +31,17 @@ import org.apache.hadoop.hbase.util.Byte
  * Keeps track of the columns for a scan if they are not explicitly specified
  */
 public class ScanWildcardColumnTracker implements ColumnTracker {
-  private static final Log LOG =
-    LogFactory.getLog(ScanWildcardColumnTracker.class);
   private byte [] columnBuffer = null;
   private int columnOffset = 0;
   private int columnLength = 0;
   private int currentCount = 0;
   private int maxVersions;
   private int minVersions;
-  /* Keeps track of the latest timestamp included for current column.
+  /* Keeps track of the latest timestamp and type included for current column.
    * Used to eliminate duplicates. */
   private long latestTSOfCurrentColumn;
+  private byte latestTypeOfCurrentColumn;
+
   private long oldestStamp;
 
   /**
@@ -58,41 +57,38 @@ public class ScanWildcardColumnTracker i
   }
 
   /**
-   * Can only return INCLUDE or SKIP, since returning "NEXT" or
-   * "DONE" would imply we have finished with this row, when
-   * this class can't figure that out.
-   *
-   * @param bytes
-   * @param offset
-   * @param length
-   * @param timestamp
-   * @return The match code instance.
+   * {@inheritDoc}
+   * This receives puts *and* deletes.
+   * Deletes do not count as a version, but rather take the version
+   * of the previous put (so eventually all but the last can be reclaimed).
    */
   @Override
   public MatchCode checkColumn(byte[] bytes, int offset, int length,
-      long timestamp) throws IOException {
+      long timestamp, byte type) throws IOException {
+    
     if (columnBuffer == null) {
       // first iteration.
       resetBuffer(bytes, offset, length);
-      return checkVersion(++currentCount, timestamp);
+      // do not count a delete marker as another version
+      return checkVersion(type, timestamp);
     }
     int cmp = Bytes.compareTo(bytes, offset, length,
         columnBuffer, columnOffset, columnLength);
     if (cmp == 0) {
       //If column matches, check if it is a duplicate timestamp
-      if (sameAsPreviousTS(timestamp)) {
+      if (sameAsPreviousTSAndType(timestamp, type)) {
         return ScanQueryMatcher.MatchCode.SKIP;
       }
-      return checkVersion(++currentCount, timestamp);
+      return checkVersion(type, timestamp);
     }
 
-    resetTS();
+    resetTSAndType();
 
     // new col > old col
     if (cmp > 0) {
       // switched columns, lets do something.x
       resetBuffer(bytes, offset, length);
-      return checkVersion(++currentCount, timestamp);
+      return checkVersion(type, timestamp);
     }
 
     // new col < oldcol
@@ -112,13 +108,25 @@ public class ScanWildcardColumnTracker i
     currentCount = 0;
   }
 
-  private MatchCode checkVersion(int version, long timestamp) {
-    if (version > maxVersions) {
+  /**
+   * Check whether this version should be retained.
+   * There are 4 variables considered:
+   * If this version is past max versions -> skip it
+   * If this kv has expired or was deleted, check min versions
+   * to decide whther to skip it or not.
+   *
+   * Increase the version counter unless this is a delete
+   */
+  private MatchCode checkVersion(byte type, long timestamp) {
+    if (!KeyValue.isDelete(type)) {
+      currentCount++;
+    }
+    if (currentCount > maxVersions) {
       return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; // skip to next col
     }
     // keep the KV if required by minversions or it is not expired, yet
-    if (version <= minVersions || !isExpired(timestamp)) {
-      setTS(timestamp);
+    if (currentCount <= minVersions || !isExpired(timestamp)) {
+      setTSAndType(timestamp, type);
       return ScanQueryMatcher.MatchCode.INCLUDE;
     } else {
       return MatchCode.SEEK_NEXT_COL;
@@ -136,19 +144,21 @@ public class ScanWildcardColumnTracker i
   @Override
   public void reset() {
     columnBuffer = null;
-    resetTS();
+    resetTSAndType();
   }
 
-  private void resetTS() {
+  private void resetTSAndType() {
     latestTSOfCurrentColumn = HConstants.LATEST_TIMESTAMP;
+    latestTypeOfCurrentColumn = 0;
   }
 
-  private void setTS(long timestamp) {
+  private void setTSAndType(long timestamp, byte type) {
     latestTSOfCurrentColumn = timestamp;
+    latestTypeOfCurrentColumn = type;
   }
 
-  private boolean sameAsPreviousTS(long timestamp) {
-    return timestamp == latestTSOfCurrentColumn;
+  private boolean sameAsPreviousTSAndType(long timestamp, byte type) {
+    return timestamp == latestTSOfCurrentColumn && type == latestTypeOfCurrentColumn;
   }
 
   private boolean isExpired(long timestamp) {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Oct 21 19:57:07 2011
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HColumnDe
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HeapSize;
@@ -49,6 +50,7 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
 import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -95,9 +97,7 @@ public class Store implements HeapSize {
   final Configuration conf;
   final CacheConfig cacheConf;
   // ttl in milliseconds.
-  protected long ttl;
-  protected int minVersions;
-  protected int maxVersions;
+  private long ttl;
   long majorCompactionTime;
   private final int minFilesToCompact;
   private final int maxFilesToCompact;
@@ -119,6 +119,8 @@ public class Store implements HeapSize {
   private CompactionProgress progress;
   private final int compactionKVMax;
 
+  // not private for testing
+  /* package */ScanInfo scanInfo;
   /*
    * List of store files inside this store. This is an immutable list that
    * is atomically replaced when its contents change.
@@ -183,8 +185,9 @@ public class Store implements HeapSize {
       // second -> ms adjust for user data
       this.ttl *= 1000;
     }
-    this.minVersions = family.getMinVersions();
-    this.maxVersions = family.getMaxVersions();
+    scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
+        family.getMaxVersions(), ttl, family.getKeepDeletedCells(),
+        this.comparator);
     this.memstore = new MemStore(conf, this.comparator);
     this.storeNameStr = Bytes.toString(this.family.getName());
 
@@ -477,13 +480,13 @@ public class Store implements HeapSize {
       return null;
     }
     Scan scan = new Scan();
-    scan.setMaxVersions(maxVersions);
+    scan.setMaxVersions(scanInfo.getMaxVersions());
     // Use a store scanner to find which rows to flush.
     // Note that we need to retain deletes, hence
-    // pass true as the StoreScanner's retainDeletesInOutput argument.
-    InternalScanner scanner = new StoreScanner(this, scan,
-        Collections.singletonList(new CollectionBackedScanner(set,
-            this.comparator)), true);
+    // treat this as a minor compaction.
+    InternalScanner scanner = new StoreScanner(this, scan, Collections
+        .singletonList(new CollectionBackedScanner(set, this.comparator)),
+        ScanType.MINOR_COMPACT, HConstants.OLDEST_TIMESTAMP);
     try {
       // TODO:  We can fail in the below block before we complete adding this
       // flush to list of store files.  Add cleanup of anything put on filesystem
@@ -1108,6 +1111,7 @@ public class Store implements HeapSize {
       throws IOException {
     // calculate maximum key count after compaction (for blooms)
     int maxKeyCount = 0;
+    long earliestPutTs = HConstants.LATEST_TIMESTAMP;
     for (StoreFile file : filesToCompact) {
       StoreFile.Reader r = file.getReader();
       if (r != null) {
@@ -1123,6 +1127,19 @@ public class Store implements HeapSize {
             ", size=" + StringUtils.humanReadableInt(r.length()) );
         }
       }
+      // For major compactions calculate the earliest put timestamp
+      // of all involved storefiles. This is used to remove 
+      // family delete marker during the compaction.
+      if (majorCompaction) {
+        byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
+        if (tmp == null) {
+          // there's a file with no information, must be an old one
+          // assume we have very old puts
+          earliestPutTs = HConstants.OLDEST_TIMESTAMP;
+        } else {
+          earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
+        }
+      }
     }
 
     // keep track of compaction progress
@@ -1141,7 +1158,9 @@ public class Store implements HeapSize {
         Scan scan = new Scan();
         scan.setMaxVersions(family.getMaxVersions());
         /* include deletes, unless we are doing a major compaction */
-        scanner = new StoreScanner(this, scan, scanners, !majorCompaction);
+        scanner = new StoreScanner(this, scan, scanners,
+            majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
+            earliestPutTs);
         if (region.getCoprocessorHost() != null) {
           InternalScanner cpScanner = region.getCoprocessorHost().preCompact(
               this, scanner);
@@ -1374,7 +1393,7 @@ public class Store implements HeapSize {
     // at all (expired or not) has at least one version that will not expire.
     // Note that this method used to take a KeyValue as arguments. KeyValue
     // can be back-dated, a row key cannot.
-    long ttlToUse = this.minVersions > 0 ? Long.MAX_VALUE : this.ttl;
+    long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.ttl;
 
     KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
 
@@ -1842,15 +1861,16 @@ public class Store implements HeapSize {
     return this.cacheConf;
   }
 
-  public static final long FIXED_OVERHEAD = ClassSize.align(
-      ClassSize.OBJECT + (17 * ClassSize.REFERENCE) +
-      (7 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
-      (7 * Bytes.SIZEOF_INT) + (1 * Bytes.SIZEOF_BOOLEAN));
-
-  public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
-      ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +
-      ClassSize.CONCURRENT_SKIPLISTMAP +
-      ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT);
+  public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+      + (18 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
+      + (1 * Bytes.SIZEOF_DOUBLE) + (5 * Bytes.SIZEOF_INT)
+      + Bytes.SIZEOF_BOOLEAN);
+
+  public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+      + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
+      + ClassSize.CONCURRENT_SKIPLISTMAP
+      + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
+      + ScanInfo.FIXED_OVERHEAD);
 
   @Override
   public long heapSize() {
@@ -1861,4 +1881,62 @@ public class Store implements HeapSize {
     return comparator;
   }
 
+  /**
+   * Immutable information for scans over a store.
+   */
+  public static class ScanInfo {
+    private byte[] family;
+    private int minVersions;
+    private int maxVersions;
+    private long ttl;
+    private boolean keepDeletedCells;
+    private KVComparator comparator;
+
+    public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+        + (2 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_INT)
+        + Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN);
+
+    /**
+     * @param family Name of this store's column family
+     * @param minVersions Store's MIN_VERSIONS setting
+     * @param maxVersions Store's VERSIONS setting
+     * @param ttl Store's TTL (in ms)
+     * @param keepDeletedCells Store's keepDeletedCells setting
+     * @param comparator The store's comparator
+     */
+    public ScanInfo(byte[] family, int minVersions, int maxVersions, long ttl,
+        boolean keepDeletedCells, KVComparator comparator) {
+
+      this.family = family;
+      this.minVersions = minVersions;
+      this.maxVersions = maxVersions;
+      this.ttl = ttl;
+      this.keepDeletedCells = keepDeletedCells;
+      this.comparator = comparator;
+    }
+
+    public byte[] getFamily() {
+      return family;
+    }
+
+    public int getMinVersions() {
+      return minVersions;
+    }
+
+    public int getMaxVersions() {
+      return maxVersions;
+    }
+
+    public long getTtl() {
+      return ttl;
+    }
+
+    public boolean getKeepDeletedCells() {
+      return keepDeletedCells;
+    }
+
+    public KVComparator getComparator() {
+      return comparator;
+    }
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Fri Oct 21 19:57:07 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
@@ -116,6 +117,9 @@ public class StoreFile {
   /** Key for Timerange information in metadata*/
   public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
 
+  /** Key for timestamp of earliest-put in metadata*/
+  public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
+
   // Make default block size for StoreFiles 8k while testing.  TODO: FIX!
   // Need to make it 8k for testing.
   public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
@@ -737,6 +741,7 @@ public class StoreFile {
     private int lastBloomKeyOffset, lastBloomKeyLen;
     private KVComparator kvComparator;
     private KeyValue lastKv = null;
+    private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
 
     TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
     /* isTimeRangeTrackerSet keeps track if the timeRange has already been set
@@ -796,14 +801,15 @@ public class StoreFile {
       writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
       writer.appendFileInfo(MAJOR_COMPACTION_KEY,
           Bytes.toBytes(majorCompaction));
-      appendTimeRangeMetadata();
+      appendTrackedTimestampsToMetadata();
     }
 
     /**
-     * Add TimestampRange to Metadata
+     * Add TimestampRange and earliest put timestamp to Metadata
      */
-    public void appendTimeRangeMetadata() throws IOException {
+    public void appendTrackedTimestampsToMetadata() throws IOException {
       appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
+      appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
     }
 
     /**
@@ -816,26 +822,19 @@ public class StoreFile {
     }
 
     /**
+     * Record the earlest Put timestamp.
+     *
      * If the timeRangeTracker is not set,
      * update TimeRangeTracker to include the timestamp of this key
      * @param kv
      * @throws IOException
      */
-    public void includeInTimeRangeTracker(final KeyValue kv) {
-      if (!isTimeRangeTrackerSet) {
-        timeRangeTracker.includeTimestamp(kv);
+    public void trackTimestamps(final KeyValue kv) {
+      if (KeyValue.Type.Put.getCode() == kv.getType()) {
+        earliestPutTs = Math.min(earliestPutTs, kv.getTimestamp());
       }
-    }
-
-    /**
-     * If the timeRangeTracker is not set,
-     * update TimeRangeTracker to include the timestamp of this key
-     * @param key
-     * @throws IOException
-     */
-    public void includeInTimeRangeTracker(final byte [] key) {
       if (!isTimeRangeTrackerSet) {
-        timeRangeTracker.includeTimestamp(key);
+        timeRangeTracker.includeTimestamp(kv);
       }
     }
 
@@ -908,7 +907,7 @@ public class StoreFile {
         }
       }
       writer.append(kv);
-      includeInTimeRangeTracker(kv);
+      trackTimestamps(kv);
     }
 
     public Path getPath() {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Fri Oct 21 19:57:07 2011
@@ -20,9 +20,10 @@
 
 package org.apache.hadoop.hbase.regionserver;
 
-import org.apache.commons.lang.NotImplementedException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Scan;
 
@@ -86,12 +87,14 @@ class StoreScanner extends NonLazyKeyVal
    * @throws IOException
    */
   StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
-                              throws IOException {
+  throws IOException {
     this(store, scan.getCacheBlocks(), scan, columns);
-    matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
-        columns, store.ttl, store.comparator.getRawComparator(),
-        store.minVersions, store.versionsToReturn(scan.getMaxVersions()),
-        false);
+    if (columns != null && scan.isRaw()) {
+      throw new DoNotRetryIOException(
+          "Cannot specify any column for a raw scan");
+    }
+    matcher = new ScanQueryMatcher(scan, store.scanInfo, columns,
+        ScanType.USER_SCAN, HConstants.LATEST_TIMESTAMP);
 
     // Pass columns to try to filter out unnecessary StoreFiles.
     List<KeyValueScanner> scanners = getScanners(scan, columns);
@@ -124,12 +127,12 @@ class StoreScanner extends NonLazyKeyVal
    * @param scan the spec
    * @param scanners ancilliary scanners
    */
-  StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners,
-      boolean retainDeletesInOutput) throws IOException {
+  StoreScanner(Store store, Scan scan,
+      List<? extends KeyValueScanner> scanners, ScanType scanType,
+      long earliestPutTs) throws IOException {
     this(store, false, scan, null);
-    matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
-        null, store.ttl, store.comparator.getRawComparator(), store.minVersions,
-        store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput);
+    matcher = new ScanQueryMatcher(scan, store.scanInfo, null, scanType,
+        earliestPutTs);
 
     // Seek all scanners to the initial key
     for(KeyValueScanner scanner : scanners) {
@@ -141,20 +144,18 @@ class StoreScanner extends NonLazyKeyVal
   }
 
   // Constructor for testing.
-  StoreScanner(final Scan scan, final byte [] colFamily, final long ttl,
-      final KeyValue.KVComparator comparator,
-      final NavigableSet<byte[]> columns,
-      final List<KeyValueScanner> scanners)
-        throws IOException {
+  StoreScanner(final Scan scan, Store.ScanInfo scanInfo,
+      StoreScanner.ScanType scanType, final NavigableSet<byte[]> columns,
+      final List<KeyValueScanner> scanners) throws IOException {
     this(null, scan.getCacheBlocks(), scan, columns);
-    this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
-        comparator.getRawComparator(), 0, scan.getMaxVersions(), false);
+    this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
+        HConstants.LATEST_TIMESTAMP);
 
     // Seek all scanners to the initial key
-    for(KeyValueScanner scanner : scanners) {
+    for (KeyValueScanner scanner : scanners) {
       scanner.seek(matcher.getStartKey());
     }
-    heap = new KeyValueHeap(scanners, comparator);
+    heap = new KeyValueHeap(scanners, scanInfo.getComparator());
   }
 
   /*
@@ -476,5 +477,13 @@ class StoreScanner extends NonLazyKeyVal
     lazySeekEnabledGlobally = enable;
   }
 
+  /**
+   * Enum to distinguish general scan types.
+   */
+  public static enum ScanType {
+    MAJOR_COMPACT,
+    MINOR_COMPACT,
+    USER_SCAN
+  }
 }
 

Modified: hbase/trunk/src/main/ruby/hbase/admin.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/hbase/admin.rb?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/hbase/admin.rb (original)
+++ hbase/trunk/src/main/ruby/hbase/admin.rb Fri Oct 21 19:57:07 2011
@@ -463,6 +463,7 @@ module Hbase
       family.setBlocksize(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE)
       family.setMaxVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS)
       family.setMinVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS)
+      family.setKeepDeletedRows(JBoolean.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS)
       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER)
         bloomtype = arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER].upcase
         unless org.apache.hadoop.hbase.regionserver.StoreFile::BloomType.constants.include?(bloomtype)      

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java Fri Oct 21 19:57:07 2011
@@ -198,7 +198,7 @@ public abstract class HBaseTestCase exte
   protected HTableDescriptor createTableDescriptor(final String name,
       final int versions) {
     return createTableDescriptor(name, HColumnDescriptor.DEFAULT_MIN_VERSIONS,
-        versions, HConstants.FOREVER);
+        versions, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
   }
 
   /**
@@ -209,21 +209,21 @@ public abstract class HBaseTestCase exte
    * @return Column descriptor.
    */
   protected HTableDescriptor createTableDescriptor(final String name,
-      final int minVersions, final int versions, final int ttl) {
+      final int minVersions, final int versions, final int ttl, boolean keepDeleted) {
     HTableDescriptor htd = new HTableDescriptor(name);
     htd.addFamily(new HColumnDescriptor(fam1, minVersions, versions,
-      HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
-      HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
-      HColumnDescriptor.DEFAULT_BLOOMFILTER,
-      HConstants.REPLICATION_SCOPE_LOCAL));
+        keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
+        HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
+        HColumnDescriptor.DEFAULT_BLOOMFILTER,
+        HConstants.REPLICATION_SCOPE_LOCAL));
     htd.addFamily(new HColumnDescriptor(fam2, minVersions, versions,
-        HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
+        keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
         HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
         HColumnDescriptor.DEFAULT_BLOOMFILTER,
         HConstants.REPLICATION_SCOPE_LOCAL));
     htd.addFamily(new HColumnDescriptor(fam3, minVersions, versions,
-        HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
-        HColumnDescriptor.DEFAULT_BLOCKSIZE,  ttl,
+        keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
+        HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
         HColumnDescriptor.DEFAULT_BLOOMFILTER,
         HConstants.REPLICATION_SCOPE_LOCAL));
     return htd;

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Fri Oct 21 19:57:07 2011
@@ -20,6 +20,7 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
@@ -71,10 +72,8 @@ import org.apache.hadoop.hbase.io.hfile.
 import org.apache.hadoop.hbase.io.hfile.CacheConfig;
 import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
 import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
 import org.junit.After;
 import org.junit.AfterClass;
@@ -129,6 +128,82 @@ public class TestFromClientSide {
   }
 
   /**
+   * Basic client side validation of HBASE-4536
+   */
+   @Test
+   public void testKeepDeletedCells() throws Exception {
+     final byte[] TABLENAME = Bytes.toBytes("testKeepDeletesCells");
+     final byte[] FAMILY = Bytes.toBytes("family");
+     final byte[] C0 = Bytes.toBytes("c0");
+
+     final byte[] T1 = Bytes.toBytes("T1");
+     final byte[] T2 = Bytes.toBytes("T2");
+     final byte[] T3 = Bytes.toBytes("T3");
+     HColumnDescriptor hcd = new HColumnDescriptor(FAMILY,
+         HColumnDescriptor.DEFAULT_MIN_VERSIONS,
+         HColumnDescriptor.DEFAULT_VERSIONS,
+         true,
+         HColumnDescriptor.DEFAULT_COMPRESSION,
+         HColumnDescriptor.DEFAULT_IN_MEMORY,
+         HColumnDescriptor.DEFAULT_BLOCKCACHE,
+         HColumnDescriptor.DEFAULT_BLOCKSIZE,
+         HColumnDescriptor.DEFAULT_TTL,
+         HColumnDescriptor.DEFAULT_BLOOMFILTER,
+         HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
+
+     HTableDescriptor desc = new HTableDescriptor(TABLENAME);
+     desc.addFamily(hcd);
+     TEST_UTIL.getHBaseAdmin().createTable(desc);
+     Configuration c = TEST_UTIL.getConfiguration();
+     HTable h = new HTable(c, TABLENAME);
+
+     long ts = System.currentTimeMillis();
+     Put p = new Put(T1, ts);
+     p.add(FAMILY, C0, T1);
+     h.put(p);
+     p = new Put(T1, ts+2);
+     p.add(FAMILY, C0, T2);
+     h.put(p);
+     p = new Put(T1, ts+4);
+     p.add(FAMILY, C0, T3);
+     h.put(p);
+
+     Delete d = new Delete(T1, ts+2, null);
+     h.delete(d);
+
+     d = new Delete(T1, ts+3, null);
+     d.deleteColumns(FAMILY, C0, ts+3);
+     h.delete(d);
+
+     Get g = new Get(T1);
+     // does *not* include the delete
+     g.setTimeRange(0, ts+3);
+     Result r = h.get(g);
+     assertArrayEquals(T2, r.getValue(FAMILY, C0));
+
+     Scan s = new Scan(T1);
+     s.setTimeRange(0, ts+3);
+     s.setMaxVersions();
+     ResultScanner scanner = h.getScanner(s);
+     KeyValue[] kvs = scanner.next().raw();
+     assertArrayEquals(T2, kvs[0].getValue());
+     assertArrayEquals(T1, kvs[1].getValue());
+     scanner.close();
+
+     s = new Scan(T1);
+     s.setRaw(true);
+     s.setMaxVersions();
+     scanner = h.getScanner(s);
+     kvs = scanner.next().raw();
+     assertTrue(kvs[0].isDeleteFamily());
+     assertArrayEquals(T3, kvs[1].getValue());
+     assertTrue(kvs[2].isDelete());
+     assertArrayEquals(T2, kvs[3].getValue());
+     assertArrayEquals(T1, kvs[4].getValue());
+     scanner.close();
+   }
+
+   /**
    * HBASE-2468 use case 1 and 2: region info de/serialization
    */
    @Test

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Fri Oct 21 19:57:07 2011
@@ -24,13 +24,8 @@ import static org.mockito.Mockito.spy;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 
-import static org.junit.Assert.fail;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -240,11 +235,15 @@ public class TestCompaction extends HBas
 
     // Multiple versions allowed for an entry, so the delete isn't enough
     // Lower TTL and expire to ensure that all our entries have been wiped
-    final int ttlInSeconds = 1;
+    final int ttl = 1000;
     for (Store store: this.r.stores.values()) {
-      store.ttl = ttlInSeconds * 1000;
+      Store.ScanInfo old = store.scanInfo;
+      Store.ScanInfo si = new Store.ScanInfo(old.getFamily(),
+          old.getMinVersions(), old.getMaxVersions(), ttl,
+          old.getKeepDeletedCells(), old.getComparator());
+      store.scanInfo = si;
     }
-    Thread.sleep(ttlInSeconds * 1000);
+    Thread.sleep(1000);
 
     r.compactStores(true);
     int count = count();
@@ -446,11 +445,15 @@ public class TestCompaction extends HBas
 
       // Multiple versions allowed for an entry, so the delete isn't enough
       // Lower TTL and expire to ensure that all our entries have been wiped
-      final int ttlInSeconds = 1;
+      final int ttl = 1000;
       for (Store store: this.r.stores.values()) {
-        store.ttl = ttlInSeconds * 1000;
+        Store.ScanInfo old = store.scanInfo;
+        Store.ScanInfo si = new Store.ScanInfo(old.getFamily(),
+            old.getMinVersions(), old.getMaxVersions(), ttl,
+            old.getKeepDeletedCells(), old.getComparator());
+        store.scanInfo = si;
       }
-      Thread.sleep(ttlInSeconds * 1000);
+      Thread.sleep(ttl);
 
       r.compactStores(true);
       assertEquals(0, count());

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java Fri Oct 21 19:57:07 2011
@@ -27,6 +27,7 @@ import java.util.TreeSet;
 import java.util.Arrays;
 
 import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
 import org.apache.hadoop.hbase.util.Bytes;
 
@@ -54,7 +55,8 @@ public class TestExplicitColumnTracker e
     long timestamp = 0;
     //"Match"
     for(byte [] col : scannerColumns){
-      result.add(exp.checkColumn(col, 0, col.length, ++timestamp));
+      result.add(exp.checkColumn(col, 0, col.length, ++timestamp,
+          KeyValue.Type.Put.getCode()));
     }
 
     assertEquals(expected.size(), result.size());
@@ -166,13 +168,13 @@ public class TestExplicitColumnTracker e
         Long.MAX_VALUE);
     for (int i = 0; i < 100000; i+=2) {
       byte [] col = Bytes.toBytes("col"+i);
-      explicit.checkColumn(col, 0, col.length, 1);
+      explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode());
     }
     explicit.update();
 
     for (int i = 1; i < 100000; i+=2) {
       byte [] col = Bytes.toBytes("col"+i);
-      explicit.checkColumn(col, 0, col.length, 1);
+      explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode());
     }
   }
 

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java?rev=1187531&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java Fri Oct 21 19:57:07 2011
@@ -0,0 +1,725 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class TestKeepDeletes extends HBaseTestCase {
+  private final byte[] T0 = Bytes.toBytes("0");
+  private final byte[] T1 = Bytes.toBytes("1");
+  private final byte[] T2 = Bytes.toBytes("2");
+  private final byte[] T3 = Bytes.toBytes("3");
+  private final byte[] T4 = Bytes.toBytes("4");
+  private final byte[] T5 = Bytes.toBytes("5");
+  private final byte[] T6 = Bytes.toBytes("6");
+
+  private final byte[] c0 = COLUMNS[0];
+  private final byte[] c1 = COLUMNS[1];
+
+  /**
+   * Make sure that deleted rows are retained.
+   * Family delete markers are deleted.
+   * Column Delete markers are versioned
+   * Time range scan of deleted rows are possible
+   */
+  public void testBasicScenario() throws Exception {
+    // keep 3 versions, rows do not expire
+    HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
+        HConstants.FOREVER, true);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    long ts = System.currentTimeMillis();
+    Put p = new Put(T1, ts);
+    p.add(c0, c0, T1);
+    region.put(p);
+    p = new Put(T1, ts+1);
+    p.add(c0, c0, T2);
+    region.put(p);
+    p = new Put(T1, ts+2);
+    p.add(c0, c0, T3);
+    region.put(p);
+    p = new Put(T1, ts+4);
+    p.add(c0, c0, T4);
+    region.put(p);
+
+    // now place a delete marker at ts+2
+    Delete d = new Delete(T1, ts+2, null);
+    region.delete(d, null, true);
+
+    // a raw scan can see the delete markers
+    // (one for each column family)
+    assertEquals(3, countDeleteMarkers(region));
+
+    // get something *before* the delete marker
+    Get g = new Get(T1);
+    g.setMaxVersions();
+    g.setTimeRange(0L, ts+2);
+    Result r = region.get(g, null);
+    checkResult(r, c0, c0, T2,T1);
+
+    // flush
+    region.flushcache();
+
+    // yep, T2 still there, T1 gone
+    r = region.get(g, null);
+    checkResult(r, c0, c0, T2);
+
+    // major compact
+    region.compactStores(true);
+    region.compactStores(true);
+
+    // one delete marker left (the others did not
+    // have older puts)
+    assertEquals(1, countDeleteMarkers(region));
+
+    // still there (even after multiple compactions)
+    r = region.get(g, null);
+    checkResult(r, c0, c0, T2);
+
+    // a timerange that includes the delete marker won't see past rows
+    g.setTimeRange(0L, ts+4);
+    r = region.get(g, null);
+    assertTrue(r.isEmpty());
+
+    // two more puts, this will expire the older puts.
+    p = new Put(T1, ts+5);
+    p.add(c0, c0, T5);
+    region.put(p);
+    p = new Put(T1, ts+6);
+    p.add(c0, c0, T6);
+    region.put(p);
+
+    // also add an old put again
+    // (which is past the max versions)
+    p = new Put(T1, ts);
+    p.add(c0, c0, T1);
+    region.put(p);
+    r = region.get(g, null);
+    assertTrue(r.isEmpty());
+
+    region.flushcache();
+    region.compactStores(true);
+    region.compactStores(true);
+
+    // verify that the delete marker itself was collected
+    region.put(p);
+    r = region.get(g, null);
+    checkResult(r, c0, c0, T1);
+    assertEquals(0, countDeleteMarkers(region));
+  }
+
+  /**
+   * Even when the store does not keep deletes a "raw" scan will 
+   * return everything it can find (unless discarding cells is guaranteed
+   * to have no effect).
+   * Assuming this the desired behavior. Could also disallow "raw" scanning
+   * if the store does not have KEEP_DELETED_CELLS enabled.
+   * (can be changed easily)
+   */
+  public void testRawScanWithoutKeepingDeletes() throws Exception {
+    // KEEP_DELETED_CELLS is NOT enabled
+    HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
+        HConstants.FOREVER, false);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    long ts = System.currentTimeMillis();
+    Put p = new Put(T1, ts);
+    p.add(c0, c0, T1);
+    region.put(p);
+
+    Delete d = new Delete(T1, ts, null);
+    d.deleteColumn(c0, c0, ts);
+    region.delete(d, null, true);
+
+    // scan still returns delete markers and deletes rows
+    Scan s = new Scan();
+    s.setRaw(true);
+    s.setMaxVersions();
+    InternalScanner scan = region.getScanner(s);
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    scan.next(kvs);
+    assertEquals(2, kvs.size());
+
+    region.flushcache();
+    region.compactStores(true);
+
+    // after compaction they are gone
+    // (note that this a test with a Store without
+    //  KEEP_DELETED_CELLS)
+    s = new Scan();
+    s.setRaw(true);
+    s.setMaxVersions();
+    scan = region.getScanner(s);
+    kvs = new ArrayList<KeyValue>();
+    scan.next(kvs);
+    assertTrue(kvs.isEmpty());
+  }
+
+  /**
+   * basic verification of existing behavior
+   */
+  public void testWithoutKeepingDeletes() throws Exception {
+    // KEEP_DELETED_CELLS is NOT enabled
+    HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
+        HConstants.FOREVER, false);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    long ts = System.currentTimeMillis();  
+    Put p = new Put(T1, ts);
+    p.add(c0, c0, T1);
+    region.put(p);
+    Delete d = new Delete(T1, ts+2, null);
+    d.deleteColumn(c0, c0, ts);
+    region.delete(d, null, true);
+
+    // "past" get does not see rows behind delete marker
+    Get g = new Get(T1);
+    g.setMaxVersions();
+    g.setTimeRange(0L, ts+1);
+    Result r = region.get(g, null);
+    assertTrue(r.isEmpty());
+
+    // "past" scan does not see rows behind delete marker
+    Scan s = new Scan();
+    s.setMaxVersions();
+    s.setTimeRange(0L, ts+1);
+    InternalScanner scanner = region.getScanner(s);
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    while(scanner.next(kvs));
+    assertTrue(kvs.isEmpty());
+
+    // flushing and minor compaction keep delete markers
+    region.flushcache();
+    region.compactStores();
+    assertEquals(1, countDeleteMarkers(region));
+    region.compactStores(true);
+    // major compaction deleted it
+    assertEquals(0, countDeleteMarkers(region));
+  }
+
+  /**
+   * The ExplicitColumnTracker does not support "raw" scanning.
+   */
+  public void testRawScanWithColumns() throws Exception {
+    HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
+        HConstants.FOREVER, true);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    Scan s = new Scan();
+    s.setRaw(true);
+    s.setMaxVersions();
+    s.addColumn(c0, c0);
+    
+    try {
+      InternalScanner scan = region.getScanner(s);
+      fail("raw scanner with columns should have failed");
+    } catch (DoNotRetryIOException dnre) {
+      // ok!
+    }
+  }
+
+  /**
+   * Verify that "raw" scanning mode return delete markers and deletes rows.
+   */
+  public void testRawScan() throws Exception {
+    HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
+        HConstants.FOREVER, true);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    long ts = System.currentTimeMillis();
+    Put p = new Put(T1, ts);
+    p.add(c0, c0, T1);
+    region.put(p);
+    p = new Put(T1, ts+2);
+    p.add(c0, c0, T2);
+    region.put(p);
+    p = new Put(T1, ts+4);
+    p.add(c0, c0, T3);
+    region.put(p);
+
+    Delete d = new Delete(T1, ts+1, null);
+    region.delete(d, null, true);
+
+    d = new Delete(T1, ts+2, null);
+    d.deleteColumn(c0, c0, ts+2);
+    region.delete(d, null, true);
+
+    d = new Delete(T1, ts+3, null);
+    d.deleteColumns(c0, c0, ts+3);
+    region.delete(d, null, true);
+
+    Scan s = new Scan();
+    s.setRaw(true);
+    s.setMaxVersions();
+    InternalScanner scan = region.getScanner(s);
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    scan.next(kvs);
+    assertTrue(kvs.get(0).isDeleteFamily());
+    assertEquals(kvs.get(1).getValue(), T3);
+    assertTrue(kvs.get(2).isDelete());
+    assertTrue(kvs.get(3).isDeleteType());
+    assertEquals(kvs.get(4).getValue(), T2);
+    assertEquals(kvs.get(5).getValue(), T1);
+  }
+
+  /**
+   * Verify that delete markers are removed from an otherwise empty store.
+   */
+  public void testDeleteMarkerExpirationEmptyStore() throws Exception {
+    HTableDescriptor htd = createTableDescriptor(getName(), 0, 1,
+        HConstants.FOREVER, true);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    long ts = System.currentTimeMillis();
+
+    Delete d = new Delete(T1, ts, null);
+    d.deleteColumns(c0, c0, ts);
+    region.delete(d, null, true);
+
+    d = new Delete(T1, ts, null);
+    d.deleteFamily(c0);
+    region.delete(d, null, true);
+
+    d = new Delete(T1, ts, null);
+    d.deleteColumn(c0, c0, ts+1);
+    region.delete(d, null, true);
+    
+    d = new Delete(T1, ts, null);
+    d.deleteColumn(c0, c0, ts+2);
+    region.delete(d, null, true);
+
+    // 1 family marker, 1 column marker, 2 version markers
+    assertEquals(4, countDeleteMarkers(region));
+
+    // neither flush nor minor compaction removes any marker
+    region.flushcache();
+    assertEquals(4, countDeleteMarkers(region));
+    region.compactStores(false);
+    assertEquals(4, countDeleteMarkers(region));
+
+    // major compaction removes all, since there are no puts they affect
+    region.compactStores(true);
+    assertEquals(0, countDeleteMarkers(region));
+  }
+
+  /**
+   * Test delete marker removal from store files.
+   */
+  public void testDeleteMarkerExpiration() throws Exception {
+    HTableDescriptor htd = createTableDescriptor(getName(), 0, 1,
+        HConstants.FOREVER, true);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    long ts = System.currentTimeMillis();
+
+    Put p = new Put(T1, ts);
+    p.add(c0, c0, T1);
+    region.put(p);
+
+    // a put into another store (CF) should have no effect
+    p = new Put(T1, ts-10);
+    p.add(c1, c0, T1);
+    region.put(p);
+
+    // all the following deletes affect the put
+    Delete d = new Delete(T1, ts, null);
+    d.deleteColumns(c0, c0, ts);
+    region.delete(d, null, true);
+
+    d = new Delete(T1, ts, null);
+    d.deleteFamily(c0, ts);
+    region.delete(d, null, true);
+
+    d = new Delete(T1, ts, null);
+    d.deleteColumn(c0, c0, ts+1);
+    region.delete(d, null, true);
+    
+    d = new Delete(T1, ts, null);
+    d.deleteColumn(c0, c0, ts+2);
+    region.delete(d, null, true);
+
+    // 1 family marker, 1 column marker, 2 version markers
+    assertEquals(4, countDeleteMarkers(region));
+
+    region.flushcache();
+    assertEquals(4, countDeleteMarkers(region));
+    region.compactStores(false);
+    assertEquals(4, countDeleteMarkers(region));
+
+    // another put will push out the earlier put...
+    p = new Put(T1, ts+3);
+    p.add(c0, c0, T1);
+    region.put(p);
+
+    region.flushcache();
+    // no markers are collected, since there is an affected put
+    region.compactStores(true);
+    assertEquals(4, countDeleteMarkers(region));
+
+    // the last collections collected the earlier put
+    // so after this collection all markers
+    region.compactStores(true);
+    assertEquals(0, countDeleteMarkers(region));
+  }
+
+  /**
+   * Verify correct range demarcation
+   */
+  public void testRanges() throws Exception {
+    HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
+        HConstants.FOREVER, true);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    long ts = System.currentTimeMillis();
+    Put p = new Put(T1, ts);
+    p.add(c0, c0, T1);
+    p.add(c0, c1, T1);
+    p.add(c1, c0, T1);
+    p.add(c1, c1, T1);
+    region.put(p);
+
+    p = new Put(T2, ts);
+    p.add(c0, c0, T1);
+    p.add(c0, c1, T1);
+    p.add(c1, c0, T1);
+    p.add(c1, c1, T1);
+    region.put(p);
+
+    p = new Put(T1, ts+1);
+    p.add(c0, c0, T2);
+    p.add(c0, c1, T2);
+    p.add(c1, c0, T2);
+    p.add(c1, c1, T2);
+    region.put(p);
+
+    p = new Put(T2, ts+1);
+    p.add(c0, c0, T2);
+    p.add(c0, c1, T2);
+    p.add(c1, c0, T2);
+    p.add(c1, c1, T2);
+    region.put(p);
+
+    Delete d = new Delete(T1, ts+1, null);
+    d.deleteColumns(c0, c0, ts+1);
+    region.delete(d, null, true);
+
+    d = new Delete(T1, ts+1, null);
+    d.deleteFamily(c1, ts+1);
+    region.delete(d, null, true);
+
+    d = new Delete(T2, ts+1, null);
+    d.deleteFamily(c0, ts+1);
+    region.delete(d, null, true);
+
+    // add an older delete, to make sure it is filtered
+    d = new Delete(T1, ts-10, null);
+    d.deleteFamily(c1, ts-10);
+    region.delete(d, null, true);
+
+    // ts + 2 does NOT include the delete at ts+1
+    checkGet(region, T1, c0, c0, ts+2, T2, T1);
+    checkGet(region, T1, c0, c1, ts+2, T2, T1);
+    checkGet(region, T1, c1, c0, ts+2, T2, T1);
+    checkGet(region, T1, c1, c1, ts+2, T2, T1);
+
+    checkGet(region, T2, c0, c0, ts+2, T2, T1);
+    checkGet(region, T2, c0, c1, ts+2, T2, T1);
+    checkGet(region, T2, c1, c0, ts+2, T2, T1);
+    checkGet(region, T2, c1, c1, ts+2, T2, T1);
+
+    // ts + 3 does
+    checkGet(region, T1, c0, c0, ts+3);
+    checkGet(region, T1, c0, c1, ts+3, T2, T1);
+    checkGet(region, T1, c1, c0, ts+3);
+    checkGet(region, T1, c1, c1, ts+3);
+
+    checkGet(region, T2, c0, c0, ts+3);
+    checkGet(region, T2, c0, c1, ts+3);
+    checkGet(region, T2, c1, c0, ts+3, T2, T1);
+    checkGet(region, T2, c1, c1, ts+3, T2, T1);
+  }
+
+  /**
+   * Verify that column/version delete makers are sorted
+   * with their respective puts and removed correctly by
+   * versioning (i.e. not relying on the store earliestPutTS).
+   */
+  public void testDeleteMarkerVersioning() throws Exception {
+    HTableDescriptor htd = createTableDescriptor(getName(), 0, 1,
+        HConstants.FOREVER, true);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    long ts = System.currentTimeMillis();
+    Put p = new Put(T1, ts);
+    p.add(c0, c0, T1);
+    region.put(p);
+
+    // this prevents marker collection based on earliestPut
+    // (cannot keep earliest put per column in the store file)
+    p = new Put(T1, ts-10);
+    p.add(c0, c1, T1);
+    region.put(p);
+    
+    Delete d = new Delete(T1, ts, null);
+    // test corner case (Put and Delete have same TS)
+    d.deleteColumns(c0, c0, ts);
+    region.delete(d, null, true);
+
+    d = new Delete(T1, ts+1, null);
+    d.deleteColumn(c0, c0, ts+1);
+    region.delete(d, null, true);
+    
+    d = new Delete(T1, ts+3, null);
+    d.deleteColumn(c0, c0, ts+3);
+    region.delete(d, null, true);
+
+    region.flushcache();
+    region.compactStores(true);
+    region.compactStores(true);
+    assertEquals(3, countDeleteMarkers(region));
+
+    // add two more puts, since max version is 1
+    // the 2nd put (and all delete markers following)
+    // will be removed.
+    p = new Put(T1, ts+2);
+    p.add(c0, c0, T2);
+    region.put(p);
+    
+    // delete, put, delete, delete, put
+    assertEquals(3, countDeleteMarkers(region));
+
+    p = new Put(T1, ts+3);
+    p.add(c0, c0, T3);
+    region.put(p);
+
+    // This is potentially questionable behavior.
+    // This could be changed by not letting the ScanQueryMatcher
+    // return SEEK_NEXT_COL if a put is past VERSIONS, but instead
+    // return SKIP if the store has KEEP_DELETED_CELLS set.
+    //
+    // As it stands, the 1 here is correct here.
+    // There are two puts, VERSIONS is one, so after the 1st put the scanner
+    // knows that there can be no more KVs (put or delete) that have any effect.
+    //
+    // delete, put, put | delete, delete
+    assertEquals(1, countDeleteMarkers(region));
+
+    // flush cache only sees what is in the memstore
+    region.flushcache();
+
+    // Here we have the three markers again, because the flush above
+    // removed the 2nd put before the file is written.
+    // So there's only one put, and hence the deletes already in the store
+    // files cannot be removed safely.
+    // delete, put, delete, delete
+    assertEquals(3, countDeleteMarkers(region));
+
+    region.compactStores(true);
+    assertEquals(3, countDeleteMarkers(region));
+
+    // add one more put
+    p = new Put(T1, ts+4);
+    p.add(c0, c0, T4);
+    region.put(p);
+
+    region.flushcache();
+    // one trailing delete marker remains (but only one)
+    // because delete markers do not increase the version count
+    assertEquals(1, countDeleteMarkers(region));
+    region.compactStores(true);
+    region.compactStores(true);
+    assertEquals(1, countDeleteMarkers(region));
+  }
+
+  /**
+   * Verify scenarios with multiple CFs and columns
+   */
+  public void testWithMixedCFs() throws Exception {
+    HTableDescriptor htd = createTableDescriptor(getName(), 0, 1,
+        HConstants.FOREVER, true);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    long ts = System.currentTimeMillis();
+
+    Put p = new Put(T1, ts);
+    p.add(c0, c0, T1);
+    p.add(c0, c1, T1);
+    p.add(c1, c0, T1);
+    p.add(c1, c1, T1);
+    region.put(p);
+
+    p = new Put(T2, ts+1);
+    p.add(c0, c0, T2);
+    p.add(c0, c1, T2);
+    p.add(c1, c0, T2);
+    p.add(c1, c1, T2);
+    region.put(p);
+
+    // family markers are each family
+    Delete d = new Delete(T1, ts, null);
+    region.delete(d, null, true);
+
+    d = new Delete(T2, ts+1, null);
+    region.delete(d, null, true);
+
+    Scan s = new Scan(T1);
+    s.setTimeRange(0, ts+1);
+    InternalScanner scanner = region.getScanner(s);
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    scanner.next(kvs);
+    assertEquals(4, kvs.size());
+    scanner.close();
+
+    s = new Scan(T2);
+    s.setTimeRange(0, ts+2);
+    scanner = region.getScanner(s);
+    kvs = new ArrayList<KeyValue>();
+    scanner.next(kvs);
+    assertEquals(4, kvs.size());
+    scanner.close();
+  }
+
+  /**
+   * Test keeping deleted rows together with min versions set
+   * @throws Exception
+   */
+  public void testWithMinVersions() throws Exception {
+    HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1, true);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    long ts = System.currentTimeMillis() - 2000; // 2s in the past
+
+    Put p = new Put(T1, ts);
+    p.add(c0, c0, T3);
+    region.put(p);
+    p = new Put(T1, ts-1);
+    p.add(c0, c0, T2);
+    region.put(p);
+    p = new Put(T1, ts-3);
+    p.add(c0, c0, T1);
+    region.put(p);
+    p = new Put(T1, ts-4);
+    p.add(c0, c0, T0);
+    region.put(p);
+
+    // all puts now are just retained because of min versions = 3
+
+    // place a family delete marker
+    Delete d = new Delete(T1, ts-1, null);
+    region.delete(d, null, true);
+    // and a column delete marker
+    d = new Delete(T1, ts-2, null);
+    d.deleteColumns(c0, c0, ts-1);
+    region.delete(d, null, true);
+
+    Get g = new Get(T1);
+    g.setMaxVersions();
+    g.setTimeRange(0L, ts-2);
+    Result r = region.get(g, null);
+    checkResult(r, c0, c0, T1,T0);
+
+    // 3 families, one column delete marker
+    assertEquals(4, countDeleteMarkers(region));
+
+    region.flushcache();
+    // no delete marker removes by the flush
+    assertEquals(4, countDeleteMarkers(region));
+
+    r = region.get(g, null);
+    checkResult(r, c0, c0, T1);
+    p = new Put(T1, ts+1);
+    p.add(c0, c0, T4);
+    region.put(p);
+    region.flushcache();
+
+    assertEquals(4, countDeleteMarkers(region));
+
+    r = region.get(g, null);
+    checkResult(r, c0, c0, T1);
+
+    // this will push out the last put before
+    // family delete marker
+    p = new Put(T1, ts+2);
+    p.add(c0, c0, T5);
+    region.put(p);
+
+    region.flushcache();
+    region.compactStores(true);
+    // the two family markers without puts are gone
+    assertEquals(2, countDeleteMarkers(region));
+
+    // the last compactStores updated the earliestPutTs,
+    // so after the next compaction the last family delete marker is also gone
+    region.compactStores(true);
+    assertEquals(0, countDeleteMarkers(region));
+  }
+
+  private void checkGet(HRegion region, byte[] row, byte[] fam, byte[] col,
+      long time, byte[]... vals) throws IOException {
+    Get g = new Get(row);
+    g.addColumn(fam, col);
+    g.setMaxVersions();
+    g.setTimeRange(0L, time);
+    Result r = region.get(g, null);
+    checkResult(r, fam, col, vals);
+    
+  }
+
+  private int countDeleteMarkers(HRegion region) throws IOException {
+    Scan s = new Scan();
+    s.setRaw(true);
+    s.setMaxVersions();
+    InternalScanner scan = region.getScanner(s);
+    List<KeyValue> kvs = new ArrayList<KeyValue>();
+    int res = 0;
+    boolean hasMore;
+    do {
+      hasMore = scan.next(kvs);
+      for (KeyValue kv : kvs) {
+        if(kv.isDelete()) res++;
+      }
+      kvs.clear();
+    } while (hasMore);
+    scan.close();    
+    return res;
+  }
+
+  private void checkResult(Result r, byte[] fam, byte[] col, byte[] ... vals) {
+    assertEquals(r.size(), vals.length);
+    List<KeyValue> kvs = r.getColumn(fam, col);
+    assertEquals(kvs.size(), vals.length);
+    for (int i=0;i<vals.length;i++) {
+      assertEquals(kvs.get(i).getValue(), vals[i]);
+    }
+  }
+
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java Fri Oct 21 19:57:07 2011
@@ -26,8 +26,6 @@ import java.rmi.UnexpectedException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.NavigableSet;
-import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicReference;
 
 import junit.framework.TestCase;
@@ -39,8 +37,9 @@ import org.apache.hadoop.hbase.HBaseConf
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueTestUtil;
-import org.apache.hadoop.hbase.client.Get;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
 import org.apache.hadoop.hbase.util.Bytes;
 
 import com.google.common.base.Joiner;
@@ -89,8 +88,10 @@ public class TestMemStore extends TestCa
     Scan scan = new Scan();
     List<KeyValue> result = new ArrayList<KeyValue>();
     ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
-    StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
-      this.memstore.comparator, null, memstorescanners);
+    ScanInfo scanInfo = new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, false,
+        this.memstore.comparator);
+    ScanType scanType = ScanType.USER_SCAN;
+    StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
     int count = 0;
     try {
       while (s.next(result)) {
@@ -111,8 +112,7 @@ public class TestMemStore extends TestCa
     ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
     memstorescanners = this.memstore.getScanners();
     // Now assert can count same number even if a snapshot mid-scan.
-    s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
-      this.memstore.comparator, null, memstorescanners);
+    s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
     count = 0;
     try {
       while (s.next(result)) {
@@ -138,8 +138,7 @@ public class TestMemStore extends TestCa
     memstorescanners = this.memstore.getScanners();
     // Assert that new values are seen in kvset as we scan.
     long ts = System.currentTimeMillis();
-    s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
-      this.memstore.comparator, null, memstorescanners);
+    s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
     count = 0;
     int snapshotIndex = 5;
     try {
@@ -553,10 +552,12 @@ public class TestMemStore extends TestCa
     }
     //starting from each row, validate results should contain the starting row
     for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
-      InternalScanner scanner =
-          new StoreScanner(new Scan(Bytes.toBytes(startRowId)), FAMILY,
-              Integer.MAX_VALUE, this.memstore.comparator, null,
-              memstore.getScanners());
+      ScanInfo scanInfo = new ScanInfo(FAMILY, 0, 1, Integer.MAX_VALUE, false,
+          this.memstore.comparator);
+      ScanType scanType = ScanType.USER_SCAN;
+      InternalScanner scanner = new StoreScanner(new Scan(
+          Bytes.toBytes(startRowId)), scanInfo, scanType, null,
+          memstore.getScanners());
       List<KeyValue> results = new ArrayList<KeyValue>();
       for (int i = 0; scanner.next(results); i++) {
         int rowId = startRowId + i;