You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2011/08/24 07:15:37 UTC

svn commit: r1160978 - in /hbase/trunk: ./ src/main/java/org/apache/hadoop/hbase/ src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/ruby/hbase/ src/test/java/org/apache/hadoop/hbase/ src/test/java/org/apache/hadoop/hbase/regionserver/

Author: stack
Date: Wed Aug 24 05:15:37 2011
New Revision: 1160978

URL: http://svn.apache.org/viewvc?rev=1160978&view=rev
Log:
HBASE-4071 Data GC: Remove all versions > TTL EXCEPT the last written version

Added:
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java
Modified:
    hbase/trunk/CHANGES.txt
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
    hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
    hbase/trunk/src/main/ruby/hbase/admin.rb
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java
    hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java

Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1160978&r1=1160977&r2=1160978&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Aug 24 05:15:37 2011
@@ -453,6 +453,8 @@ Release 0.91.0 - Unreleased
    HBASE-4176  Exposing HBase Filters to the Thrift API (Anirudh Todi)
    HBASE-4221  Changes necessary to build and run against Hadoop 0.23
                (todd)
+   HBASE-4071  Data GC: Remove all versions > TTL EXCEPT the last
+               written version (Lars Hofhansl)
 
 Release 0.90.5 - Unreleased
 

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java?rev=1160978&r1=1160977&r2=1160978&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java Wed Aug 24 05:15:37 2011
@@ -87,6 +87,7 @@ public class HColumnDescriptor implement
   public static final String BLOOMFILTER = "BLOOMFILTER";
   public static final String FOREVER = "FOREVER";
   public static final String REPLICATION_SCOPE = "REPLICATION_SCOPE";
+  public static final String MIN_VERSIONS = "MIN_VERSIONS";
 
   /**
    * Default compression type.
@@ -99,6 +100,11 @@ public class HColumnDescriptor implement
    */
   public static final int DEFAULT_VERSIONS = 3;
 
+  /**
+   * Default is not to keep a minimum of versions.
+   */
+  public static final int DEFAULT_MIN_VERSIONS = 0;
+
   /*
    * Cache here the HCD value.
    * Question: its OK to cache since when we're reenable, we create a new HCD?
@@ -259,6 +265,37 @@ public class HColumnDescriptor implement
       final String compression, final boolean inMemory,
       final boolean blockCacheEnabled, final int blocksize,
       final int timeToLive, final String bloomFilter, final int scope) {
+    this(familyName, DEFAULT_MIN_VERSIONS, maxVersions, compression, inMemory,
+        blockCacheEnabled, blocksize, timeToLive, bloomFilter, scope);
+  }
+
+  /**
+   * Constructor
+   * @param familyName Column family name. Must be 'printable' -- digit or
+   * letter -- and may not contain a <code>:<code>
+   * @param minVersions Minimum number of versions to keep
+   * @param maxVersions Maximum number of versions to keep
+   * @param compression Compression type
+   * @param inMemory If true, column data should be kept in an HRegionServer's
+   * cache
+   * @param blockCacheEnabled If true, MapFile blocks should be cached
+   * @param blocksize Block size to use when writing out storefiles.  Use
+   * smaller blocksizes for faster random-access at expense of larger indices
+   * (more memory consumption).  Default is usually 64k.
+   * @param timeToLive Time-to-live of cell contents, in seconds
+   * (use HConstants.FOREVER for unlimited TTL)
+   * @param bloomFilter Bloom filter type for this column
+   * @param scope The scope tag for this column
+   *
+   * @throws IllegalArgumentException if passed a family name that is made of
+   * other than 'word' characters: i.e. <code>[a-zA-Z_0-9]</code> or contains
+   * a <code>:</code>
+   * @throws IllegalArgumentException if the number of versions is &lt;= 0
+   */
+  public HColumnDescriptor(final byte [] familyName, final int minVersions,
+      final int maxVersions, final String compression, final boolean inMemory,
+      final boolean blockCacheEnabled, final int blocksize,
+      final int timeToLive, final String bloomFilter, final int scope) {
     isLegalFamilyName(familyName);
     this.name = familyName;
 
@@ -267,7 +304,19 @@ public class HColumnDescriptor implement
       // Until there is support, consider 0 or < 0 -- a configuration error.
       throw new IllegalArgumentException("Maximum versions must be positive");
     }
+
+    if (minVersions > 0) {
+      if (timeToLive == HConstants.FOREVER) {
+        throw new IllegalArgumentException("Minimum versions requires TTL.");
+      }
+      if (minVersions > maxVersions) {
+        throw new IllegalArgumentException("Minimum versions must be <= "+
+            "maximum versions.");
+      }
+    }
+
     setMaxVersions(maxVersions);
+    setMinVersions(minVersions);
     setInMemory(inMemory);
     setBlockCacheEnabled(blockCacheEnabled);
     setTimeToLive(timeToLive);
@@ -509,6 +558,22 @@ public class HColumnDescriptor implement
   }
 
   /**
+   * @return The minimum number of versions to keep.
+   */
+  public int getMinVersions() {
+    String value = getValue(MIN_VERSIONS);
+    return (value != null)? Integer.valueOf(value).intValue(): 0;
+  }
+
+  /**
+   * @param minVersions The minimum number of versions to keep.
+   * (used when timeToLive is set)
+   */
+  public void setMinVersions(int minVersions) {
+    setValue(MIN_VERSIONS, Integer.toString(minVersions));
+  }
+
+  /**
    * @return True if MapFile blocks should be cached.
    */
   public boolean isBlockCacheEnabled() {

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java?rev=1160978&r1=1160977&r2=1160978&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java Wed Aug 24 05:15:37 2011
@@ -19,10 +19,12 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
+
 /**
  * Implementing classes of this interface will be used for the tracking
- * and enforcement of columns and numbers of versions during the course of a
- * Get or Scan operation.
+ * and enforcement of columns and numbers of versions and timeToLive during
+ * the course of a Get or Scan operation.
  * <p>
  * Currently there are two different types of Store/Family-level queries.
  * <ul><li>{@link ExplicitColumnTracker} is used when the query specifies
@@ -42,11 +44,11 @@ public interface ColumnTracker {
    * @param bytes
    * @param offset
    * @param length
-   * @param timestamp
+   * @param ttl The timeToLive to enforce.
    * @return The match code instance.
    */
   public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
-      int length, long timestamp);
+      int length, long ttl);
 
   /**
    * Updates internal variables in between files
@@ -76,4 +78,19 @@ public interface ColumnTracker {
    * @return null, or a ColumnCount that we should seek to
    */
   public ColumnCount getColumnHint();
+
+  /**
+   * Retrieve the MatchCode for the next row or column
+   */
+  public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
+      int qualLength);
+
+  /**
+   * Give the tracker a chance to declare it's done based on only the timestamp
+   * to allow an early out.
+   *
+   * @param timestamp
+   * @return <code>true</code> to early out based on timestamp.
+   */
+  public boolean isDone(long timestamp);
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java?rev=1160978&r1=1160977&r2=1160978&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java Wed Aug 24 05:15:37 2011
@@ -24,6 +24,7 @@ import java.util.List;
 import java.util.NavigableSet;
 
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
 import org.apache.hadoop.hbase.util.Bytes;
 
 /**
@@ -48,6 +49,7 @@ import org.apache.hadoop.hbase.util.Byte
 public class ExplicitColumnTracker implements ColumnTracker {
 
   private final int maxVersions;
+  private final int minVersions;
   private final List<ColumnCount> columns;
   private final List<ColumnCount> columnsToReuse;
   private int index;
@@ -55,23 +57,29 @@ public class ExplicitColumnTracker imple
   /** Keeps track of the latest timestamp included for current column.
    * Used to eliminate duplicates. */
   private long latestTSOfCurrentColumn;
+  private long oldestStamp;
 
   /**
    * Default constructor.
    * @param columns columns specified user in query
+   * @param minVersions minimum number of versions to keep
    * @param maxVersions maximum versions to return per column
+   * @param ttl The timeToLive to enforce
    */
-  public ExplicitColumnTracker(NavigableSet<byte[]> columns, int maxVersions) {
+  public ExplicitColumnTracker(NavigableSet<byte[]> columns, int minVersions,
+      int maxVersions, long ttl) {
     this.maxVersions = maxVersions;
+    this.minVersions = minVersions;
+    this.oldestStamp = System.currentTimeMillis() - ttl;
     this.columns = new ArrayList<ColumnCount>(columns.size());
     this.columnsToReuse = new ArrayList<ColumnCount>(columns.size());
     for(byte [] column : columns) {
-      this.columnsToReuse.add(new ColumnCount(column,maxVersions));
+      this.columnsToReuse.add(new ColumnCount(column));
     }
     reset();
   }
 
-  /**
+    /**
    * Done when there are no more columns to match against.
    */
   public boolean done() {
@@ -108,7 +116,7 @@ public class ExplicitColumnTracker imple
       int ret = Bytes.compareTo(column.getBuffer(), column.getOffset(),
           column.getLength(), bytes, offset, length);
 
-      // Column Matches. If it is not a duplicate key, decrement versions left
+      // Column Matches. If it is not a duplicate key, increment the version count
       // and include.
       if(ret == 0) {
         //If column matches, check if it is a duplicate timestamp
@@ -116,7 +124,8 @@ public class ExplicitColumnTracker imple
           //If duplicate, skip this Key
           return ScanQueryMatcher.MatchCode.SKIP;
         }
-        if(this.column.decrement() == 0) {
+        int count = this.column.increment();
+        if(count >= maxVersions || (count >= minVersions && isExpired(timestamp))) {
           // Done with versions for this column
           this.columns.remove(this.index);
           resetTS();
@@ -185,11 +194,15 @@ public class ExplicitColumnTracker imple
     return timestamp == latestTSOfCurrentColumn;
   }
 
+  private boolean isExpired(long timestamp) {
+    return timestamp < oldestStamp;
+  }
+
   private void buildColumnList() {
     this.columns.clear();
     this.columns.addAll(this.columnsToReuse);
     for(ColumnCount col : this.columns) {
-      col.setCount(this.maxVersions);
+      col.setCount(0);
     }
   }
 
@@ -227,5 +240,18 @@ public class ExplicitColumnTracker imple
       }
     }
   }
+  public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
+      int qualLength) {
+    doneWithColumn(bytes, offset,qualLength);
+
+    if (getColumnHint() == null) {
+      return MatchCode.SEEK_NEXT_ROW;
+    } else {
+      return MatchCode.SEEK_NEXT_COL;
+    }
+  }
 
+  public boolean isDone(long timestamp) {
+    return minVersions <=0 && isExpired(timestamp);
+  }
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java?rev=1160978&r1=1160977&r2=1160978&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java Wed Aug 24 05:15:37 2011
@@ -1297,9 +1297,8 @@ public class HRegion implements HeapSize
     this.readRequestsCount.increment();
     try {
       Store store = getStore(family);
-      KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
       // get the closest key. (HStore.getRowKeyAtOrBefore can return null)
-      KeyValue key = store.getRowKeyAtOrBefore(kv);
+      KeyValue key = store.getRowKeyAtOrBefore(row);
       Result result = null;
       if (key != null) {
         Get get = new Get(key.getRow());

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1160978&r1=1160977&r2=1160978&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Wed Aug 24 05:15:37 2011
@@ -53,9 +53,6 @@ public class ScanQueryMatcher {
   /** Key to seek to in memstore and StoreFiles */
   protected KeyValue startKey;
 
-  /** Oldest allowed version stamp for TTL enforcement */
-  protected long oldestStamp;
-
   /** Row comparator for the region this query is for */
   KeyValue.KeyComparator rowComparator;
 
@@ -72,10 +69,9 @@ public class ScanQueryMatcher {
    */
   public ScanQueryMatcher(Scan scan, byte [] family,
       NavigableSet<byte[]> columns, long ttl,
-      KeyValue.KeyComparator rowComparator, int maxVersions,
+      KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions,
       boolean retainDeletesInOutput) {
     this.tr = scan.getTimeRange();
-    this.oldestStamp = System.currentTimeMillis() - ttl;
     this.rowComparator = rowComparator;
     this.deletes =  new ScanDeleteTracker();
     this.stopRow = scan.getStopRow();
@@ -86,19 +82,26 @@ public class ScanQueryMatcher {
     // Single branch to deal with two types of reads (columns vs all in family)
     if (columns == null || columns.size() == 0) {
       // use a specialized scan for wildcard column tracker.
-      this.columns = new ScanWildcardColumnTracker(maxVersions);
+      this.columns = new ScanWildcardColumnTracker(minVersions, maxVersions, ttl);
     } else {
       // We can share the ExplicitColumnTracker, diff is we reset
       // between rows, not between storefiles.
-      this.columns = new ExplicitColumnTracker(columns,maxVersions);
+      this.columns = new ExplicitColumnTracker(columns, minVersions, maxVersions,
+          ttl);
     }
   }
   public ScanQueryMatcher(Scan scan, byte [] family,
       NavigableSet<byte[]> columns, long ttl,
-      KeyValue.KeyComparator rowComparator, int maxVersions) {
+      KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions) {
       /* By default we will not include deletes */
       /* deletes are included explicitly (for minor compaction) */
-      this(scan, family, columns, ttl, rowComparator, maxVersions, false);
+      this(scan, family, columns, ttl, rowComparator, minVersions, maxVersions,
+          false);
+  }
+  public ScanQueryMatcher(Scan scan, byte [] family,
+      NavigableSet<byte[]> columns, long ttl,
+      KeyValue.KeyComparator rowComparator, int maxVersions) {
+    this(scan, family, columns, ttl, rowComparator, 0, maxVersions);
   }
 
   /**
@@ -158,9 +161,9 @@ public class ScanQueryMatcher {
       (offset - initialOffset) - KeyValue.TIMESTAMP_TYPE_SIZE;
 
     long timestamp = kv.getTimestamp();
-    if (isExpired(timestamp)) {
-      // done, the rest of this column will also be expired as well.
-      return getNextRowOrNextColumn(bytes, offset, qualLength);
+    // check for early out based on timestamp alone
+    if (columns.isDone(timestamp)) {
+        return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
     }
 
     byte type = kv.getType();
@@ -192,7 +195,7 @@ public class ScanQueryMatcher {
     if (timestampComparison >= 1) {
       return MatchCode.SKIP;
     } else if (timestampComparison <= -1) {
-      return getNextRowOrNextColumn(bytes, offset, qualLength);
+      return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
     }
 
     /**
@@ -206,7 +209,7 @@ public class ScanQueryMatcher {
       if (filterResponse == ReturnCode.SKIP) {
         return MatchCode.SKIP;
       } else if (filterResponse == ReturnCode.NEXT_COL) {
-        return getNextRowOrNextColumn(bytes, offset, qualLength);
+        return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
       } else if (filterResponse == ReturnCode.NEXT_ROW) {
         stickyNextRow = true;
         return MatchCode.SEEK_NEXT_ROW;
@@ -228,23 +231,6 @@ public class ScanQueryMatcher {
 
   }
 
-  public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
-      int qualLength) {
-    if (columns instanceof ExplicitColumnTracker) {
-      //We only come here when we know that columns is an instance of
-      //ExplicitColumnTracker so we should never have a cast exception
-      ((ExplicitColumnTracker)columns).doneWithColumn(bytes, offset,
-          qualLength);
-      if (columns.getColumnHint() == null) {
-        return MatchCode.SEEK_NEXT_ROW;
-      } else {
-        return MatchCode.SEEK_NEXT_COL;
-      }
-    } else {
-      return MatchCode.SEEK_NEXT_COL;
-    }
-  }
-
   public boolean moreRowsMayExistAfter(KeyValue kv) {
     if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) &&
         rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(),
@@ -278,10 +264,6 @@ public class ScanQueryMatcher {
     return (type != KeyValue.Type.Put.getCode());
   }
 
-  protected boolean isExpired(long timestamp) {
-    return (timestamp < oldestStamp);
-  }
-
   /**
    *
    * @return the start key

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java?rev=1160978&r1=1160977&r2=1160978&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java Wed Aug 24 05:15:37 2011
@@ -37,16 +37,22 @@ public class ScanWildcardColumnTracker i
   private int columnLength = 0;
   private int currentCount = 0;
   private int maxVersions;
+  private int minVersions;
   /* Keeps track of the latest timestamp included for current column.
    * Used to eliminate duplicates. */
   private long latestTSOfCurrentColumn;
+  private long oldestStamp;
 
   /**
    * Return maxVersions of every row.
-   * @param maxVersion
+   * @param minVersion Minimum number of versions to keep
+   * @param maxVersion Maximum number of versions to return
+   * @param ttl TimeToLive to enforce
    */
-  public ScanWildcardColumnTracker(int maxVersion) {
+  public ScanWildcardColumnTracker(int minVersion, int maxVersion, long ttl) {
     this.maxVersions = maxVersion;
+    this.minVersions = minVersion;
+    this.oldestStamp = System.currentTimeMillis() - ttl;
   }
 
   /**
@@ -65,16 +71,8 @@ public class ScanWildcardColumnTracker i
       long timestamp) {
     if (columnBuffer == null) {
       // first iteration.
-      columnBuffer = bytes;
-      columnOffset = offset;
-      columnLength = length;
-      currentCount = 0;
-
-      if (++currentCount > maxVersions) {
-        return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
-      }
-      setTS(timestamp);
-      return ScanQueryMatcher.MatchCode.INCLUDE;
+      resetBuffer(bytes, offset, length);
+      return checkVersion(++currentCount, timestamp);
     }
     int cmp = Bytes.compareTo(bytes, offset, length,
         columnBuffer, columnOffset, columnLength);
@@ -83,11 +81,7 @@ public class ScanWildcardColumnTracker i
       if (sameAsPreviousTS(timestamp)) {
         return ScanQueryMatcher.MatchCode.SKIP;
       }
-      if (++currentCount > maxVersions) {
-        return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; // skip to next col
-      }
-      setTS(timestamp);
-      return ScanQueryMatcher.MatchCode.INCLUDE;
+      return checkVersion(++currentCount, timestamp);
     }
 
     resetTS();
@@ -95,14 +89,8 @@ public class ScanWildcardColumnTracker i
     // new col > old col
     if (cmp > 0) {
       // switched columns, lets do something.x
-      columnBuffer = bytes;
-      columnOffset = offset;
-      columnLength = length;
-      currentCount = 0;
-      if (++currentCount > maxVersions)
-        return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
-      setTS(timestamp);
-      return ScanQueryMatcher.MatchCode.INCLUDE;
+      resetBuffer(bytes, offset, length);
+      return checkVersion(++currentCount, timestamp);
     }
 
     // new col < oldcol
@@ -111,18 +99,32 @@ public class ScanWildcardColumnTracker i
     // was incorrectly stored into the store for this one. Continue, but
     // complain.
     LOG.error("ScanWildcardColumnTracker.checkColumn ran " +
-  		"into a column actually smaller than the previous column: " +
+        "into a column actually smaller than the previous column: " +
       Bytes.toStringBinary(bytes, offset, length));
     // switched columns
+    resetBuffer(bytes, offset, length);
+    return checkVersion(++currentCount, timestamp);
+  }
+
+  private void resetBuffer(byte[] bytes, int offset, int length) {
     columnBuffer = bytes;
     columnOffset = offset;
     columnLength = length;
     currentCount = 0;
-    if (++currentCount > maxVersions) {
-      return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL;
+  }
+
+  private MatchCode checkVersion(int version, long timestamp) {
+    if (version > maxVersions) {
+      return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; // skip to next col
+    }
+    // keep the KV if required by minversions or it is not expired, yet
+    if (version <= minVersions || !isExpired(timestamp)) {
+      setTS(timestamp);
+      return ScanQueryMatcher.MatchCode.INCLUDE;
+    } else {
+      return MatchCode.SEEK_NEXT_COL;
     }
-    setTS(timestamp);
-    return ScanQueryMatcher.MatchCode.INCLUDE;
+
   }
 
   @Override
@@ -150,6 +152,10 @@ public class ScanWildcardColumnTracker i
     return timestamp == latestTSOfCurrentColumn;
   }
 
+  private boolean isExpired(long timestamp) {
+    return timestamp < oldestStamp;
+  }
+
   /**
    * Used by matcher and scan/get to get a hint of the next column
    * to seek to after checkColumn() returns SKIP.  Returns the next interesting
@@ -170,4 +176,14 @@ public class ScanWildcardColumnTracker i
   public boolean done() {
     return false;
   }
+
+  public MatchCode getNextRowOrNextColumn(byte[] bytes, int offset,
+      int qualLength) {
+    return MatchCode.SEEK_NEXT_COL;
+  }
+
+  public boolean isDone(long timestamp) {
+    return minVersions <=0 && isExpired(timestamp);
+  }
+
 }

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1160978&r1=1160977&r2=1160978&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Wed Aug 24 05:15:37 2011
@@ -40,7 +40,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.*;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.HeapSize;
-import org.apache.hadoop.hbase.io.hfile.BlockCache;
 import org.apache.hadoop.hbase.io.hfile.Compression;
 import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
@@ -52,8 +51,6 @@ import org.apache.hadoop.hbase.util.Envi
 import org.apache.hadoop.util.StringUtils;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Collections2;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
@@ -92,6 +89,7 @@ public class Store implements HeapSize {
   final Configuration conf;
   // ttl in milliseconds.
   protected long ttl;
+  protected int minVersions;
   long majorCompactionTime;
   private final int minFilesToCompact;
   private final int maxFilesToCompact;
@@ -179,6 +177,7 @@ public class Store implements HeapSize {
       // second -> ms adjust for user data
       this.ttl *= 1000;
     }
+    this.minVersions = family.getMinVersions();
     this.memstore = new MemStore(conf, this.comparator);
     this.storeNameStr = Bytes.toString(this.family.getName());
 
@@ -491,12 +490,14 @@ public class Store implements HeapSize {
       // A. Write the map out to the disk
       writer = createWriterInTmp(set.size());
       writer.setTimeRangeTracker(snapshotTimeRangeTracker);
-      int entries = 0;
       try {
         for (KeyValue kv: set) {
-          if (!isExpired(kv, oldestTimestamp)) {
+          // If minVersion > 0 we will wait until the next compaction to
+          // collect expired KVs. (following the logic for maxVersions).
+          // TODO: As Jonathan Gray points this can be optimized
+          // (see HBASE-4241)
+          if (minVersions > 0 || !isExpired(kv, oldestTimestamp)) {
             writer.append(kv);
-            entries++;
             flushed += this.memstore.heapSizeChange(kv, true);
           }
         }
@@ -717,7 +718,7 @@ public class Store implements HeapSize {
       // Ready to go. Have list of files to compact.
       StoreFile.Writer writer = compactStore(filesToCompact, isMajor, maxId);
       // Move the compaction into place.
-      StoreFile sf = completeCompaction(filesToCompact, writer);
+      completeCompaction(filesToCompact, writer);
     } finally {
       synchronized (filesCompacting) {
         filesCompacting.removeAll(filesToCompact);
@@ -1267,14 +1268,23 @@ public class Store implements HeapSize {
    * current container: i.e. we'll see deletes before we come across cells we
    * are to delete. Presumption is that the memstore#kvset is processed before
    * memstore#snapshot and so on.
-   * @param kv First possible item on targeted row; i.e. empty columns, latest
-   * timestamp and maximum type.
+   * @param row The row key of the targeted row.
    * @return Found keyvalue or null if none found.
    * @throws IOException
    */
-  KeyValue getRowKeyAtOrBefore(final KeyValue kv) throws IOException {
+  KeyValue getRowKeyAtOrBefore(final byte[] row) throws IOException {
+    // If minVersions is set, we will not ignore expired KVs.
+    // As we're only looking for the latest matches, that should be OK.
+    // With minVersions > 0 we guarantee that any KV that has any version
+    // at all (expired or not) has at least one version that will not expire.
+    // Note that this method used to take a KeyValue as arguments. KeyValue
+    // can be back-dated, a row key cannot.
+    long ttlToUse = this.minVersions > 0 ? Long.MAX_VALUE : this.ttl;
+
+    KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
+
     GetClosestRowBeforeTracker state = new GetClosestRowBeforeTracker(
-      this.comparator, kv, this.ttl, this.region.getRegionInfo().isMetaRegion());
+      this.comparator, kv, ttlToUse, this.region.getRegionInfo().isMetaRegion());
     this.lock.readLock().lock();
     try {
       // First go to the memstore.  Pick up deletes and candidates.

Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1160978&r1=1160977&r2=1160978&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Wed Aug 24 05:15:37 2011
@@ -64,7 +64,7 @@ class StoreScanner implements KeyValueSc
     this.cacheBlocks = scan.getCacheBlocks();
     matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
         columns, store.ttl, store.comparator.getRawComparator(),
-        store.versionsToReturn(scan.getMaxVersions()), 
+        store.minVersions, store.versionsToReturn(scan.getMaxVersions()),
         false);
 
     this.isGet = scan.isGetScan();
@@ -98,7 +98,7 @@ class StoreScanner implements KeyValueSc
     this.cacheBlocks = false;
     this.isGet = false;
     matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
-        null, store.ttl, store.comparator.getRawComparator(),
+        null, store.ttl, store.comparator.getRawComparator(), store.minVersions,
         store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput);
 
     // Seek all scanners to the initial key
@@ -120,7 +120,7 @@ class StoreScanner implements KeyValueSc
     this.isGet = false;
     this.cacheBlocks = scan.getCacheBlocks();
     this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
-        comparator.getRawComparator(), scan.getMaxVersions(), false);
+        comparator.getRawComparator(), 0, scan.getMaxVersions(), false);
 
     // Seek all scanners to the initial key
     for(KeyValueScanner scanner : scanners) {

Modified: hbase/trunk/src/main/ruby/hbase/admin.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/hbase/admin.rb?rev=1160978&r1=1160977&r2=1160978&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/hbase/admin.rb (original)
+++ hbase/trunk/src/main/ruby/hbase/admin.rb Wed Aug 24 05:15:37 2011
@@ -408,6 +408,7 @@ module Hbase
       family.setCompressionType(org.apache.hadoop.hbase.io.hfile.Compression::Algorithm.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::COMPRESSION)
       family.setBlocksize(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE)
       family.setMaxVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS)
+      family.setMinVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS)
       if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER)
         bloomtype = arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER].upcase
         unless org.apache.hadoop.hbase.regionserver.StoreFile::BloomType.constants.include?(bloomtype)      

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java?rev=1160978&r1=1160977&r2=1160978&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java Wed Aug 24 05:15:37 2011
@@ -197,20 +197,33 @@ public abstract class HBaseTestCase exte
    */
   protected HTableDescriptor createTableDescriptor(final String name,
       final int versions) {
+    return createTableDescriptor(name, HColumnDescriptor.DEFAULT_MIN_VERSIONS,
+        versions, HConstants.FOREVER);
+  }
+
+  /**
+   * Create a table of name <code>name</code> with {@link COLUMNS} for
+   * families.
+   * @param name Name to give table.
+   * @param versions How many versions to allow per column.
+   * @return Column descriptor.
+   */
+  protected HTableDescriptor createTableDescriptor(final String name,
+      final int minVersions, final int versions, final int ttl) {
     HTableDescriptor htd = new HTableDescriptor(name);
-    htd.addFamily(new HColumnDescriptor(fam1, versions,
+    htd.addFamily(new HColumnDescriptor(fam1, minVersions, versions,
       HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
-      HColumnDescriptor.DEFAULT_BLOCKSIZE, HConstants.FOREVER,
+      HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
       HColumnDescriptor.DEFAULT_BLOOMFILTER,
       HConstants.REPLICATION_SCOPE_LOCAL));
-    htd.addFamily(new HColumnDescriptor(fam2, versions,
+    htd.addFamily(new HColumnDescriptor(fam2, minVersions, versions,
         HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
-        HColumnDescriptor.DEFAULT_BLOCKSIZE, HConstants.FOREVER,
+        HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
         HColumnDescriptor.DEFAULT_BLOOMFILTER,
         HConstants.REPLICATION_SCOPE_LOCAL));
-    htd.addFamily(new HColumnDescriptor(fam3, versions,
+    htd.addFamily(new HColumnDescriptor(fam3, minVersions, versions,
         HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
-        HColumnDescriptor.DEFAULT_BLOCKSIZE,  HConstants.FOREVER,
+        HColumnDescriptor.DEFAULT_BLOCKSIZE,  ttl,
         HColumnDescriptor.DEFAULT_BLOOMFILTER,
         HConstants.REPLICATION_SCOPE_LOCAL));
     return htd;

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java?rev=1160978&r1=1160977&r2=1160978&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java Wed Aug 24 05:15:37 2011
@@ -44,7 +44,7 @@ public class TestExplicitColumnTracker e
                        List<byte[]> scannerColumns,
                        List<MatchCode> expected) {
     ColumnTracker exp = new ExplicitColumnTracker(
-      trackColumns, maxVersions);
+      trackColumns, 0, maxVersions, Long.MAX_VALUE);
 
 
     //Initialize result
@@ -161,7 +161,8 @@ public class TestExplicitColumnTracker e
       columns.add(Bytes.toBytes("col"+i));
     }
 
-    ColumnTracker explicit = new ExplicitColumnTracker(columns, maxVersions);
+    ColumnTracker explicit = new ExplicitColumnTracker(columns, 0, maxVersions,
+        Long.MAX_VALUE);
     for (int i = 0; i < 100000; i+=2) {
       byte [] col = Bytes.toBytes("col"+i);
       explicit.checkColumn(col, 0, col.length, 1);

Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java?rev=1160978&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java Wed Aug 24 05:15:37 2011
@@ -0,0 +1,423 @@
+/**
+ * Copyright 2011 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.filter.TimestampsFilter;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Test Minimum Versions feature (HBASE-4071).
+ */
+public class TestMinVersions extends HBaseTestCase {
+  private final byte[] T0 = Bytes.toBytes("0");
+  private final byte[] T1 = Bytes.toBytes("1");
+  private final byte[] T2 = Bytes.toBytes("2");
+  private final byte[] T3 = Bytes.toBytes("3");
+  private final byte[] T4 = Bytes.toBytes("4");
+  private final byte[] T5 = Bytes.toBytes("5");
+
+  private final byte[] c0 = COLUMNS[0];
+
+  /**
+   * Verify behavior of getClosestBefore(...)
+   */
+  public void testGetClosestBefore() throws Exception {
+    HTableDescriptor htd = createTableDescriptor(getName(), 1, 1000, 1);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    long ts = System.currentTimeMillis() - 2000; // 2s in the past
+
+    Put p = new Put(T1, ts);
+    p.add(c0, c0, T1);
+    region.put(p);
+
+    p = new Put(T1, ts+1);
+    p.add(c0, c0, T4);
+    region.put(p);
+
+    p = new Put(T3, ts);
+    p.add(c0, c0, T3);
+    region.put(p);
+
+    // now make sure that getClosestBefore(...) get can
+    // rows that would be expired without minVersion.
+    // also make sure it gets the latest version
+    Result r = region.getClosestRowBefore(T1, c0);
+    checkResult(r, c0, T4);
+
+    r = region.getClosestRowBefore(T2, c0);
+    checkResult(r, c0, T4);
+
+    // now flush/compact
+    region.flushcache();
+    region.compactStores(true);
+
+    r = region.getClosestRowBefore(T1, c0);
+    checkResult(r, c0, T4);
+
+    r = region.getClosestRowBefore(T2, c0);
+    checkResult(r, c0, T4);
+  }
+
+  /**
+   * Test mixed memstore and storefile scanning
+   * with minimum versions.
+   */
+  public void testStoreMemStore() throws Exception {
+    // keep 3 versions minimum
+    HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    long ts = System.currentTimeMillis() - 2000; // 2s in the past
+
+    Put p = new Put(T1, ts-1);
+    p.add(c0, c0, T2);
+    region.put(p);
+
+    p = new Put(T1, ts-3);
+    p.add(c0, c0, T0);
+    region.put(p);
+
+    // now flush/compact
+    region.flushcache();
+    region.compactStores(true);
+
+    p = new Put(T1, ts);
+    p.add(c0, c0, T3);
+    region.put(p);
+
+    p = new Put(T1, ts-2);
+    p.add(c0, c0, T1);
+    region.put(p);
+
+    p = new Put(T1, ts-3);
+    p.add(c0, c0, T0);
+    region.put(p);
+
+    // newest version in the memstore
+    // the 2nd oldest in the store file
+    // and the 3rd, 4th oldest also in the memstore
+
+    Get g = new Get(T1);
+    g.setMaxVersions();
+    Result r = region.get(g, null); // this'll use ScanWildcardColumnTracker
+    checkResult(r, c0, T3,T2,T1);
+
+    g = new Get(T1);
+    g.setMaxVersions();
+    g.addColumn(c0, c0);
+    r = region.get(g, null);  // this'll use ExplicitColumnTracker
+    checkResult(r, c0, T3,T2,T1);
+  }
+
+  /**
+   * Make sure the Deletes behave as expected with minimum versions
+   */
+  public void testDelete() throws Exception {
+    HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    long ts = System.currentTimeMillis() - 2000; // 2s in the past
+
+    Put p = new Put(T1, ts-2);
+    p.add(c0, c0, T1);
+    region.put(p);
+
+    p = new Put(T1, ts-1);
+    p.add(c0, c0, T2);
+    region.put(p);
+
+    p = new Put(T1, ts);
+    p.add(c0, c0, T3);
+    region.put(p);
+
+    Delete d = new Delete(T1, ts-1, null);
+    region.delete(d, null, true);
+
+    Get g = new Get(T1);
+    g.setMaxVersions();
+    Result r = region.get(g, null);  // this'll use ScanWildcardColumnTracker
+    checkResult(r, c0, T3);
+
+    g = new Get(T1);
+    g.setMaxVersions();
+    g.addColumn(c0, c0);
+    r = region.get(g, null);  // this'll use ExplicitColumnTracker
+    checkResult(r, c0, T3);
+
+    // now flush/compact
+    region.flushcache();
+    region.compactStores(true);
+
+    // try again
+    g = new Get(T1);
+    g.setMaxVersions();
+    r = region.get(g, null);  // this'll use ScanWildcardColumnTracker
+    checkResult(r, c0, T3);
+
+    g = new Get(T1);
+    g.setMaxVersions();
+    g.addColumn(c0, c0);
+    r = region.get(g, null);  // this'll use ExplicitColumnTracker
+    checkResult(r, c0, T3);
+  }
+
+  /**
+   * Make sure the memstor behaves correctly with minimum versions
+   */
+  public void testMemStore() throws Exception {
+    HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    long ts = System.currentTimeMillis() - 2000; // 2s in the past
+
+    // 2nd version
+    Put p = new Put(T1, ts-2);
+    p.add(c0, c0, T2);
+    region.put(p);
+
+    // 3rd version
+    p = new Put(T1, ts-1);
+    p.add(c0, c0, T3);
+    region.put(p);
+
+    // 4th version
+    p = new Put(T1, ts);
+    p.add(c0, c0, T4);
+    region.put(p);
+
+    // now flush/compact
+    region.flushcache();
+    region.compactStores(true);
+
+    // now put the first version (backdated)
+    p = new Put(T1, ts-3);
+    p.add(c0, c0, T1);
+    region.put(p);
+
+    // now the latest change is in the memstore,
+    // but it is not the latest version
+
+    Result r = region.get(new Get(T1), null);
+    checkResult(r, c0, T4);
+
+    Get g = new Get(T1);
+    g.setMaxVersions();
+    r = region.get(g, null); // this'll use ScanWildcardColumnTracker
+    checkResult(r, c0, T4,T3);
+
+    g = new Get(T1);
+    g.setMaxVersions();
+    g.addColumn(c0, c0);
+    r = region.get(g, null);  // this'll use ExplicitColumnTracker
+    checkResult(r, c0, T4,T3);
+
+    p = new Put(T1, ts+1);
+    p.add(c0, c0, T5);
+    region.put(p);
+
+    // now the latest version is in the memstore
+
+    g = new Get(T1);
+    g.setMaxVersions();
+    r = region.get(g, null);  // this'll use ScanWildcardColumnTracker
+    checkResult(r, c0, T5,T4);
+
+    g = new Get(T1);
+    g.setMaxVersions();
+    g.addColumn(c0, c0);
+    r = region.get(g, null);  // this'll use ExplicitColumnTracker
+    checkResult(r, c0, T5,T4);
+  }
+
+  /**
+   * Verify basic minimum versions functionality
+   */
+  public void testBaseCase() throws Exception {
+    // 1 version minimum, 1000 versions maximum, ttl = 1s
+    HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1);
+    HRegion region = createNewHRegion(htd, null, null);
+
+    long ts = System.currentTimeMillis() - 2000; // 2s in the past
+
+     // 1st version
+    Put p = new Put(T1, ts-3);
+    p.add(c0, c0, T1);
+    region.put(p);
+
+    // 2nd version
+    p = new Put(T1, ts-2);
+    p.add(c0, c0, T2);
+    region.put(p);
+
+    // 3rd version
+    p = new Put(T1, ts-1);
+    p.add(c0, c0, T3);
+    region.put(p);
+
+    // 4th version
+    p = new Put(T1, ts);
+    p.add(c0, c0, T4);
+    region.put(p);
+
+    Result r = region.get(new Get(T1), null);
+    checkResult(r, c0, T4);
+
+    Get g = new Get(T1);
+    g.setTimeRange(0L, ts+1);
+    r = region.get(g, null);
+    checkResult(r, c0, T4);
+
+    // oldest version still exists
+    g.setTimeRange(0L, ts-2);
+    r = region.get(g, null);
+    checkResult(r, c0, T1);
+
+    // gets see only available versions
+    // even before compactions
+    g = new Get(T1);
+    g.setMaxVersions();
+    r = region.get(g, null); // this'll use ScanWildcardColumnTracker
+    checkResult(r, c0, T4,T3);
+
+    g = new Get(T1);
+    g.setMaxVersions();
+    g.addColumn(c0, c0);
+    r = region.get(g, null);  // this'll use ExplicitColumnTracker
+    checkResult(r, c0, T4,T3);
+
+    // now flush
+    region.flushcache();
+    region.compactStores();
+
+    // oldest version still exists
+    // flushing/minor compactions can't get rid of these, anymore
+    g = new Get(T1);
+    g.setTimeRange(0L, ts-2);
+    r = region.get(g, null);
+    checkResult(r, c0, T1);
+
+    // major compaction
+    region.compactStores(true);
+
+    // after compaction the 4th version is still available
+    g = new Get(T1);
+    g.setTimeRange(0L, ts+1);
+    r = region.get(g, null);
+    checkResult(r, c0, T4);
+
+    // so is the 3rd
+    g.setTimeRange(0L, ts);
+    r = region.get(g, null);
+    checkResult(r, c0, T3);
+
+    // but the 2nd and earlier versions are gone
+    g.setTimeRange(0L, ts-1);
+    r = region.get(g, null);
+    assertTrue(r.isEmpty());
+  }
+
+  /**
+   * Verify that basic filters still behave correctly with
+   * minimum versions enabled.
+   */
+  public void testFilters() throws Exception {
+    HTableDescriptor htd = createTableDescriptor(getName(), 2, 1000, 1);
+    HRegion region = createNewHRegion(htd, null, null);
+    final byte [] c1 = COLUMNS[1];
+
+    long ts = System.currentTimeMillis() - 2000; // 2s in the past
+
+    Put p = new Put(T1, ts-3);
+    p.add(c0, c0, T0);
+    p.add(c1, c1, T0);
+    region.put(p);
+
+    p = new Put(T1, ts-2);
+    p.add(c0, c0, T1);
+    p.add(c1, c1, T1);
+    region.put(p);
+
+    p = new Put(T1, ts-1);
+    p.add(c0, c0, T2);
+    p.add(c1, c1, T2);
+    region.put(p);
+
+    p = new Put(T1, ts);
+    p.add(c0, c0, T3);
+    p.add(c1, c1, T3);
+    region.put(p);
+
+    List<Long> tss = new ArrayList<Long>();
+    tss.add(ts-1);
+    tss.add(ts-2);
+
+    Get g = new Get(T1);
+    g.addColumn(c1,c1);
+    g.setFilter(new TimestampsFilter(tss));
+    g.setMaxVersions();
+    Result r = region.get(g, null);
+    checkResult(r, c1, T2,T1);
+
+    g = new Get(T1);
+    g.addColumn(c0,c0);
+    g.setFilter(new TimestampsFilter(tss));
+    g.setMaxVersions();
+    r = region.get(g, null);
+    checkResult(r, c0, T2,T1);
+
+    // now flush/compact
+    region.flushcache();
+    region.compactStores(true);
+
+    g = new Get(T1);
+    g.addColumn(c1,c1);
+    g.setFilter(new TimestampsFilter(tss));
+    g.setMaxVersions();
+    r = region.get(g, null);
+    checkResult(r, c1, T2);
+
+    g = new Get(T1);
+    g.addColumn(c0,c0);
+    g.setFilter(new TimestampsFilter(tss));
+    g.setMaxVersions();
+    r = region.get(g, null);
+    checkResult(r, c0, T2);
+}
+
+  private void checkResult(Result r, byte[] col, byte[] ... vals) {
+    assertEquals(r.size(), vals.length);
+    List<KeyValue> kvs = r.getColumn(col, col);
+    assertEquals(kvs.size(), vals.length);
+    for (int i=0;i<vals.length;i++) {
+      assertEquals(kvs.get(i).getValue(), vals[i]);
+    }
+  }
+}

Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java?rev=1160978&r1=1160977&r2=1160978&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java Wed Aug 24 05:15:37 2011
@@ -33,7 +33,7 @@ public class TestScanWildcardColumnTrack
 
   public void testCheckColumn_Ok() {
     ScanWildcardColumnTracker tracker =
-      new ScanWildcardColumnTracker(VERSIONS);
+      new ScanWildcardColumnTracker(0, VERSIONS, Long.MAX_VALUE);
 
     //Create list of qualifiers
     List<byte[]> qualifiers = new ArrayList<byte[]>();
@@ -65,7 +65,7 @@ public class TestScanWildcardColumnTrack
 
   public void testCheckColumn_EnforceVersions() {
     ScanWildcardColumnTracker tracker =
-      new ScanWildcardColumnTracker(VERSIONS);
+      new ScanWildcardColumnTracker(0, VERSIONS, Long.MAX_VALUE);
 
     //Create list of qualifiers
     List<byte[]> qualifiers = new ArrayList<byte[]>();
@@ -98,7 +98,7 @@ public class TestScanWildcardColumnTrack
 
   public void DisabledTestCheckColumn_WrongOrder() {
     ScanWildcardColumnTracker tracker =
-      new ScanWildcardColumnTracker(VERSIONS);
+      new ScanWildcardColumnTracker(0, VERSIONS, Long.MAX_VALUE);
 
     //Create list of qualifiers
     List<byte[]> qualifiers = new ArrayList<byte[]>();