You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2014/12/05 21:00:58 UTC

[1/3] hbase git commit: HBASE-10560 Per cell TTLs

Repository: hbase
Updated Branches:
  refs/heads/0.98 45c8be3fc -> 869c5665c
  refs/heads/branch-1 1bd27bfa2 -> 004e977ba
  refs/heads/master f83e25e18 -> 09cd3d7bf


HBASE-10560 Per cell TTLs


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/09cd3d7b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/09cd3d7b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/09cd3d7b

Branch: refs/heads/master
Commit: 09cd3d7bfb9f4c4b279a74441d4059da6b8177d2
Parents: f83e25e
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Dec 5 10:59:56 2014 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Dec 5 11:10:26 2014 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/Append.java  |   5 +
 .../org/apache/hadoop/hbase/client/Delete.java  |   5 +
 .../apache/hadoop/hbase/client/Increment.java   |   5 +
 .../apache/hadoop/hbase/client/Mutation.java    |  34 +++
 .../org/apache/hadoop/hbase/client/Put.java     |   5 +
 .../java/org/apache/hadoop/hbase/TagType.java   |   1 +
 .../hadoop/hbase/mapreduce/CellCreator.java     |  35 ++-
 .../hadoop/hbase/mapreduce/ImportTsv.java       |  55 ++++-
 .../hadoop/hbase/mapreduce/TextSortReducer.java |  24 +-
 .../hbase/mapreduce/TsvImporterMapper.java      |  26 ++-
 .../GetClosestRowBeforeTracker.java             |  25 ++-
 .../hadoop/hbase/regionserver/HRegion.java      | 224 ++++++++++++++-----
 .../hadoop/hbase/regionserver/HStore.java       |  36 ++-
 .../hbase/regionserver/ScanQueryMatcher.java    |  26 ++-
 .../hadoop/hbase/regionserver/StoreScanner.java |  12 +-
 .../hbase/mapreduce/TestImportTSVWithTTLs.java  | 173 ++++++++++++++
 .../hadoop/hbase/regionserver/TestHRegion.java  | 131 +++++++++++
 .../hbase/regionserver/TestQueryMatcher.java    |  18 +-
 .../TestScanWildcardColumnTracker.java          |   3 -
 .../hadoop/hbase/regionserver/TestTags.java     |  18 +-
 hbase-shell/src/main/ruby/hbase/table.rb        |  24 +-
 hbase-shell/src/test/ruby/hbase/table_test.rb   |  13 ++
 22 files changed, 793 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
index 9e3be3e..d5a4552 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
@@ -171,4 +171,9 @@ public class Append extends Mutation {
   public Append setACL(Map<String, Permission> perms) {
     return (Append) super.setACL(perms);
   }
+
+  @Override
+  public Append setTTL(long ttl) {
+    return (Append) super.setTTL(ttl);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
index ea76d0d..d947ef8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
@@ -461,4 +461,9 @@ public class Delete extends Mutation implements Comparable<Row> {
   public Delete setACL(Map<String, Permission> perms) {
     return (Delete) super.setACL(perms);
   }
+
+  @Override
+  public Delete setTTL(long ttl) {
+    throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
index b12ee02..b6e6a52 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
@@ -317,4 +317,9 @@ public class Increment extends Mutation implements Comparable<Row> {
   public Increment setACL(Map<String, Permission> perms) {
     return (Increment) super.setACL(perms);
   }
+
+  @Override
+  public Increment setTTL(long ttl) {
+    return (Increment) super.setTTL(ttl);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index b7f7d1b..28284e5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -76,6 +76,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
    */
   private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
 
+  /**
+   * The attribute for storing TTL for the result of the mutation.
+   */
+  private static final String OP_ATTRIBUTE_TTL = "_ttl";
+
   protected byte [] row = null;
   protected long ts = HConstants.LATEST_TIMESTAMP;
   protected Durability durability = Durability.USE_DEFAULT;
@@ -199,6 +204,12 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
     if (getId() != null) {
       map.put("id", getId());
     }
+    // Add the TTL if set
+    // Long.MAX_VALUE is the default, and is interpreted to mean this attribute
+    // has not been set.
+    if (getTTL() != Long.MAX_VALUE) {
+      map.put("ttl", getTTL());
+    }
     return map;
   }
 
@@ -417,6 +428,29 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
   }
 
   /**
+   * Return the TTL requested for the result of the mutation, in milliseconds.
+   * @return the TTL requested for the result of the mutation, in milliseconds,
+   * or Long.MAX_VALUE if unset
+   */
+  public long getTTL() {
+    byte[] ttlBytes = getAttribute(OP_ATTRIBUTE_TTL);
+    if (ttlBytes != null) {
+      return Bytes.toLong(ttlBytes);
+    }
+    return Long.MAX_VALUE;
+  }
+
+  /**
+   * Set the TTL desired for the result of the mutation, in milliseconds.
+   * @param ttl the TTL desired for the result of the mutation, in milliseconds
+   * @return this
+   */
+  public Mutation setTTL(long ttl) {
+    setAttribute(OP_ATTRIBUTE_TTL, Bytes.toBytes(ttl));
+    return this;
+  }
+
+  /**
    * Subclasses should override this method to add the heap size of their own fields.
    * @return the heap size to add (will be aligned).
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
index f0d3a72..b9d652d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
@@ -460,4 +460,9 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
   public Put setACL(Map<String, Permission> perms) {
     return (Put) super.setACL(perms);
   }
+
+  @Override
+  public Put setTTL(long ttl) {
+    return (Put) super.setTTL(ttl);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index f088dcf..2095b7a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -30,4 +30,5 @@ public final class TagType {
   public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
   // String based tag type used in replication
   public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
+  public static final byte TTL_TAG_TYPE = (byte)8;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
index b3dfee7..001f64d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
@@ -69,7 +69,7 @@ public class CellCreator {
       byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
       int vlength) throws IOException {
     return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength,
-        timestamp, value, voffset, vlength, null);
+        timestamp, value, voffset, vlength, (List<Tag>)null);
   }
 
   /**
@@ -90,6 +90,7 @@ public class CellCreator {
    * @return created Cell
    * @throws IOException
    */
+  @Deprecated
   public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
       byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
       int vlength, String visExpression) throws IOException {
@@ -100,4 +101,36 @@ public class CellCreator {
     return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
         qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags);
   }
+
+  /**
+   * @param row row key
+   * @param roffset row offset
+   * @param rlength row length
+   * @param family family name
+   * @param foffset family offset
+   * @param flength family length
+   * @param qualifier column qualifier
+   * @param qoffset qualifier offset
+   * @param qlength qualifier length
+   * @param timestamp version timestamp
+   * @param value column value
+   * @param voffset value offset
+   * @param vlength value length
+   * @param tags
+   * @return created Cell
+   * @throws IOException
+   */
+  public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
+      byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
+      int vlength, List<Tag> tags) throws IOException {
+    return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
+        qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags);
+  }
+
+  /**
+   * @return Visibility expression resolver
+   */
+  public VisibilityExpressionResolver getVisibilityExpressionResolver() {
+    return this.visExpResolver;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
index 7ae6599..54e0034 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
@@ -125,13 +125,20 @@ public class ImportTsv extends Configured implements Tool {
 
     public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY";
 
+    public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL";
+
     private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
 
     public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
 
     public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1;
 
+    public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1;
+
     private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+
+    private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX;
+
     /**
      * @param columnsSpecification the list of columns to parser out, comma separated.
      * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
@@ -162,14 +169,18 @@ public class ImportTsv extends Configured implements Tool {
           timestampKeyColumnIndex = i;
           continue;
         }
-        if(ATTRIBUTES_COLUMN_SPEC.equals(str)) {
+        if (ATTRIBUTES_COLUMN_SPEC.equals(str)) {
           attrKeyColumnIndex = i;
           continue;
         }
-        if(CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
+        if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
           cellVisibilityColumnIndex = i;
           continue;
         }
+        if (CELL_TTL_COLUMN_SPEC.equals(str)) {
+          cellTTLColumnIndex = i;
+          continue;
+        }
         String[] parts = str.split(":", 2);
         if (parts.length == 1) {
           families[i] = str.getBytes();
@@ -197,6 +208,10 @@ public class ImportTsv extends Configured implements Tool {
       return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
     }
 
+    public boolean hasCellTTL() {
+      return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+    }
+
     public int getAttributesKeyColumnIndex() {
       return attrKeyColumnIndex;
     }
@@ -204,9 +219,15 @@ public class ImportTsv extends Configured implements Tool {
     public int getCellVisibilityColumnIndex() {
       return cellVisibilityColumnIndex;
     }
+
+    public int getCellTTLColumnIndex() {
+      return cellTTLColumnIndex;
+    }
+
     public int getRowKeyColumnIndex() {
       return rowKeyColumnIndex;
     }
+
     public byte[] getFamily(int idx) {
       return families[idx];
     }
@@ -238,8 +259,10 @@ public class ImportTsv extends Configured implements Tool {
         throw new BadTsvLineException("No timestamp");
       } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
         throw new BadTsvLineException("No attributes specified");
-      } else if(hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
+      } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
         throw new BadTsvLineException("No cell visibility specified");
+      } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) {
+        throw new BadTsvLineException("No cell TTL specified");
       }
       return new ParsedLine(tabOffsets, lineBytes);
     }
@@ -336,6 +359,31 @@ public class ImportTsv extends Configured implements Tool {
         }
       }
 
+      public int getCellTTLColumnOffset() {
+        if (hasCellTTL()) {
+          return getColumnOffset(cellTTLColumnIndex);
+        } else {
+          return DEFAULT_CELL_TTL_COLUMN_INDEX;
+        }
+      }
+
+      public int getCellTTLColumnLength() {
+        if (hasCellTTL()) {
+          return getColumnLength(cellTTLColumnIndex);
+        } else {
+          return DEFAULT_CELL_TTL_COLUMN_INDEX;
+        }
+      }
+
+      public long getCellTTL() {
+        if (!hasCellTTL()) {
+          return 0;
+        } else {
+          return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex),
+              getColumnLength(cellTTLColumnIndex));
+        }
+      }
+
       public int getColumnOffset(int idx) {
         if (idx > 0)
           return tabOffsets.get(idx - 1) + 1;
@@ -535,6 +583,7 @@ public class ImportTsv extends Configured implements Tool {
       if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
           || TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn)
           || TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn)
+          || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn)
           || TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn))
         continue;
       // we are only concerned with the first one (in case this is a cf:cq)

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
index 4a0e0fd..b3981a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
@@ -18,7 +18,9 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -28,8 +30,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -62,6 +67,9 @@ public class TextSortReducer extends
   /** Cell visibility expr **/
   private String cellVisibilityExpr;
 
+  /** Cell TTL */
+  private long ttl;
+
   private CellCreator kvCreator;
 
   public long getTs() {
@@ -148,18 +156,30 @@ public class TextSortReducer extends
           // Retrieve timestamp if exists
           ts = parsed.getTimestamp(ts);
           cellVisibilityExpr = parsed.getCellVisibility();
+          ttl = parsed.getCellTTL();
 
           for (int i = 0; i < parsed.getColumnCount(); i++) {
             if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
-                || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
+                || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
+                || i == parser.getCellTTLColumnIndex()) {
               continue;
             }
             // Creating the KV which needs to be directly written to HFiles. Using the Facade
             // KVCreator for creation of kvs.
+            List<Tag> tags = new ArrayList<Tag>();
+            if (cellVisibilityExpr != null) {
+              tags.addAll(kvCreator.getVisibilityExpressionResolver()
+                .createVisibilityExpTags(cellVisibilityExpr));
+            }
+            // Add TTL directly to the KV so we can vary them when packing more than one KV
+            // into puts
+            if (ttl > 0) {
+              tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
+            }
             Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(),
                 parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
                 parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
-                parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr);
+                parsed.getColumnOffset(i), parsed.getColumnLength(i), tags);
             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
             kvs.add(kv);
             curSize += kv.heapSize();

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
index ff84081..270de75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
@@ -18,17 +18,22 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
 import org.apache.hadoop.hbase.security.visibility.CellVisibility;
 import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Counter;
@@ -59,6 +64,8 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
 
   protected String cellVisibilityExpr;
 
+  protected long ttl;
+
   protected CellCreator kvCreator;
 
   private String hfileOutPath;
@@ -144,11 +151,13 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
       // Retrieve timestamp if exists
       ts = parsed.getTimestamp(ts);
       cellVisibilityExpr = parsed.getCellVisibility();
+      ttl = parsed.getCellTTL();
 
       Put put = new Put(rowKey.copyBytes());
       for (int i = 0; i < parsed.getColumnCount(); i++) {
         if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
-            || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
+            || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
+            || i == parser.getCellTTLColumnIndex()) {
           continue;
         }
         populatePut(lineBytes, parsed, put, i);
@@ -192,13 +201,26 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
         // the validation
         put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
       }
+      if (ttl > 0) {
+        put.setTTL(ttl);
+      }
     } else {
       // Creating the KV which needs to be directly written to HFiles. Using the Facade
       // KVCreator for creation of kvs.
+      List<Tag> tags = new ArrayList<Tag>();
+      if (cellVisibilityExpr != null) {
+        tags.addAll(kvCreator.getVisibilityExpressionResolver()
+          .createVisibilityExpTags(cellVisibilityExpr));
+      }
+      // Add TTL directly to the KV so we can vary them when packing more than one KV
+      // into puts
+      if (ttl > 0) {
+        tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
+      }
       cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
           parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
           parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
-          parsed.getColumnLength(i), cellVisibilityExpr);
+          parsed.getColumnLength(i), tags);
     }
     put.add(cell);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
index fce3b29..4d22c0e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}.
- * Like {@link ScanDeleteTracker} and {@link ScanDeleteTracker} but does not
+ * Like {@link ScanQueryMatcher} and {@link ScanDeleteTracker} but does not
  * implement the {@link DeleteTracker} interface since state spans rows (There
  * is no update nor reset method).
  */
@@ -42,7 +42,8 @@ import org.apache.hadoop.hbase.util.Bytes;
 class GetClosestRowBeforeTracker {
   private final KeyValue targetkey;
   // Any cell w/ a ts older than this is expired.
-  private final long oldestts;
+  private final long now;
+  private final long oldestUnexpiredTs;
   private Cell candidate = null;
   private final KVComparator kvcomparator;
   // Flag for whether we're doing getclosest on a metaregion.
@@ -75,19 +76,12 @@ class GetClosestRowBeforeTracker {
         HConstants.DELIMITER) - this.rowoffset;
     }
     this.tablenamePlusDelimiterLength = metaregion? l + 1: -1;
-    this.oldestts = System.currentTimeMillis() - ttl;
+    this.now = System.currentTimeMillis();
+    this.oldestUnexpiredTs = now - ttl;
     this.kvcomparator = c;
     this.deletes = new TreeMap<Cell, NavigableSet<Cell>>(new CellComparator.RowComparator());
   }
 
-  /**
-   * @param kv
-   * @return True if this <code>kv</code> is expired.
-   */
-  boolean isExpired(final Cell kv) {
-    return HStore.isExpired(kv, this.oldestts);
-  }
-
   /*
    * Add the specified KeyValue to the list of deletes.
    * @param kv
@@ -172,6 +166,15 @@ class GetClosestRowBeforeTracker {
     return false;
   }
 
+  /**
+   * @param cell
+   * @return true if the cell is expired
+   */
+  public boolean isExpired(final Cell cell) {
+    return cell.getTimestamp() < this.oldestUnexpiredTs ||
+      HStore.isCellTTLExpired(cell, this.oldestUnexpiredTs, this.now);
+  }
+
   /*
    * Handle keys whose values hold deletes.
    * Add to the set of deletes and then if the candidate keys contain any that

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 1007a88..f7243d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -85,6 +85,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.client.Append;
@@ -2642,6 +2644,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
           noOfDeletes++;
         }
+        rewriteCellTags(familyMaps[i], mutation);
       }
 
       lock(this.updatesLock.readLock(), numReadyToWrite);
@@ -3116,6 +3119,59 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     }
   }
 
+  /**
+   * Possibly rewrite incoming cell tags.
+   */
+  void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
+    // Check if we have any work to do and early out otherwise
+    // Update these checks as more logic is added here
+
+    if (m.getTTL() == Long.MAX_VALUE) {
+      return;
+    }
+
+    // From this point we know we have some work to do
+
+    for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
+      List<Cell> cells = e.getValue();
+      assert cells instanceof RandomAccess;
+      int listSize = cells.size();
+      for (int i = 0; i < listSize; i++) {
+        Cell cell = cells.get(i);
+        List<Tag> newTags = new ArrayList<Tag>();
+        Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
+          cell.getTagsOffset(), cell.getTagsLength());
+
+        // Carry forward existing tags
+
+        while (tagIterator.hasNext()) {
+
+          // Add any filters or tag specific rewrites here
+
+          newTags.add(tagIterator.next());
+        }
+
+        // Cell TTL handling
+
+        // Check again if we need to add a cell TTL because early out logic
+        // above may change when there are more tag based features in core.
+        if (m.getTTL() != Long.MAX_VALUE) {
+          // Add a cell TTL tag
+          newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
+        }
+
+        // Rewrite the cell with the updated set of tags
+
+        cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+          cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+          cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+          cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
+          cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+          newTags));
+      }
+    }
+  }
+
   /*
    * Check if resources to support an update.
    *
@@ -5213,6 +5269,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           processor.preBatchMutate(this, walEdit);
           // 7. Apply to memstore
           for (Mutation m : mutations) {
+            // Handle any tag based cell features
+            rewriteCellTags(m.getFamilyCellMap(), m);
+
             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
               Cell cell = cellScanner.current();
               CellUtil.setSequenceId(cell, mvccNum);
@@ -5351,8 +5410,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
   }
 
-  // TODO: There's a lot of boiler plate code identical
-  // to increment... See how to better unify that.
+  // TODO: There's a lot of boiler plate code identical to increment.
+  // We should refactor append and increment as local get-mutate-put
+  // transactions, so all stores only go through one code path for puts.
   /**
    * Perform one or more append operations on a row.
    *
@@ -5422,8 +5482,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             // Iterate the input columns and update existing values if they were
             // found, otherwise add new column initialized to the append value
 
-            // Avoid as much copying as possible. Every byte is copied at most
-            // once.
+            // Avoid as much copying as possible. We may need to rewrite and
+            // consolidate tags. Bytes are only copied once.
             // Would be nice if KeyValue had scatter/gather logic
             int idx = 0;
             for (Cell cell : family.getValue()) {
@@ -5433,40 +5493,87 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
                   && CellUtil.matchingQualifier(results.get(idx), cell)) {
                 oldCell = results.get(idx);
                 long ts = Math.max(now, oldCell.getTimestamp());
-                // allocate an empty kv once
+
+                // Process cell tags
+                List<Tag> newTags = new ArrayList<Tag>();
+
+                // Make a union of the set of tags in the old and new KVs
+
+                if (oldCell.getTagsLength() > 0) {
+                  Iterator<Tag> i = CellUtil.tagsIterator(oldCell.getTagsArray(),
+                    oldCell.getTagsOffset(), oldCell.getTagsLength());
+                  while (i.hasNext()) {
+                    newTags.add(i.next());
+                  }
+                }
+                if (cell.getTagsLength() > 0) {
+                  Iterator<Tag> i  = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+                    cell.getTagsLength());
+                  while (i.hasNext()) {
+                    newTags.add(i.next());
+                  }
+                }
+
+                // Cell TTL handling
+
+                if (append.getTTL() != Long.MAX_VALUE) {
+                  // Add the new TTL tag
+                  newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+                }
+
+                // Rebuild tags
+                byte[] tagBytes = Tag.fromList(newTags);
+
+                // allocate an empty cell once
                 newCell = new KeyValue(row.length, cell.getFamilyLength(),
                     cell.getQualifierLength(), ts, KeyValue.Type.Put,
                     oldCell.getValueLength() + cell.getValueLength(),
-                    oldCell.getTagsLength() + cell.getTagsLength());
-                // copy in the value
-                System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
-                    newCell.getValueArray(), newCell.getValueOffset(),
-                    oldCell.getValueLength());
-                System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
-                    newCell.getValueArray(),
-                    newCell.getValueOffset() + oldCell.getValueLength(),
-                    cell.getValueLength());
-                // copy in the tags
-                System.arraycopy(oldCell.getTagsArray(), oldCell.getTagsOffset(),
-                    newCell.getTagsArray(), newCell.getTagsOffset(), oldCell.getTagsLength());
-                System.arraycopy(cell.getTagsArray(), cell.getTagsOffset(), newCell.getTagsArray(),
-                    newCell.getTagsOffset() + oldCell.getTagsLength(), cell.getTagsLength());
+                    tagBytes.length);
                 // copy in row, family, and qualifier
                 System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
-                    newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
+                  newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
                 System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
-                    newCell.getFamilyArray(), newCell.getFamilyOffset(),
-                    cell.getFamilyLength());
+                  newCell.getFamilyArray(), newCell.getFamilyOffset(),
+                  cell.getFamilyLength());
                 System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
-                    newCell.getQualifierArray(), newCell.getQualifierOffset(),
-                    cell.getQualifierLength());
+                  newCell.getQualifierArray(), newCell.getQualifierOffset(),
+                  cell.getQualifierLength());
+                // copy in the value
+                System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
+                  newCell.getValueArray(), newCell.getValueOffset(),
+                  oldCell.getValueLength());
+                System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
+                  newCell.getValueArray(),
+                  newCell.getValueOffset() + oldCell.getValueLength(),
+                  cell.getValueLength());
+                // Copy in tag data
+                System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
+                  tagBytes.length);
                 idx++;
               } else {
-                // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
-                // so only need to update the timestamp to 'now'
+                // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP
                 CellUtil.updateLatestStamp(cell, now);
-                newCell = cell;
-             }
+
+                // Cell TTL handling
+
+                if (append.getTTL() != Long.MAX_VALUE) {
+                  List<Tag> newTags = new ArrayList<Tag>(1);
+                  newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+                  // Add the new TTL tag
+                  newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
+                      cell.getRowLength(),
+                    cell.getFamilyArray(), cell.getFamilyOffset(),
+                      cell.getFamilyLength(),
+                    cell.getQualifierArray(), cell.getQualifierOffset(),
+                      cell.getQualifierLength(),
+                    cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
+                    cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                    newTags);
+                } else {
+                  newCell = cell;
+                }
+              }
+
               CellUtil.setSequenceId(newCell, mvccNum);
               // Give coprocessors a chance to update the new cell
               if (coprocessorHost != null) {
@@ -5570,6 +5677,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
   }
 
+  // TODO: There's a lot of boiler plate code identical to append.
+  // We should refactor append and increment as local get-mutate-put
+  // transactions, so all stores only go through one code path for puts.
   /**
    * Perform one or more increment operations on a row.
    * @return new keyvalues after increment
@@ -5642,13 +5752,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             // Iterate the input columns and update existing values if they were
             // found, otherwise add new column initialized to the increment amount
             int idx = 0;
-            for (Cell kv: family.getValue()) {
-              long amount = Bytes.toLong(CellUtil.cloneValue(kv));
+            for (Cell cell: family.getValue()) {
+              long amount = Bytes.toLong(CellUtil.cloneValue(cell));
               boolean noWriteBack = (amount == 0);
+              List<Tag> newTags = new ArrayList<Tag>();
+
+              // Carry forward any tags that might have been added by a coprocessor
+              if (cell.getTagsLength() > 0) {
+                Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(),
+                  cell.getTagsOffset(), cell.getTagsLength());
+                while (i.hasNext()) {
+                  newTags.add(i.next());
+                }
+              }
 
               Cell c = null;
               long ts = now;
-              if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
+              if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
                 c = results.get(idx);
                 ts = Math.max(now, c.getTimestamp());
                 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
@@ -5658,32 +5778,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
                   throw new org.apache.hadoop.hbase.DoNotRetryIOException(
                       "Attempted to increment field that isn't 64 bits wide");
                 }
+                // Carry tags forward from previous version
+                if (c.getTagsLength() > 0) {
+                  Iterator<Tag> i = CellUtil.tagsIterator(c.getTagsArray(),
+                    c.getTagsOffset(), c.getTagsLength());
+                  while (i.hasNext()) {
+                    newTags.add(i.next());
+                  }
+                }
                 idx++;
               }
 
               // Append new incremented KeyValue to list
-              byte[] q = CellUtil.cloneQualifier(kv);
+              byte[] q = CellUtil.cloneQualifier(cell);
               byte[] val = Bytes.toBytes(amount);
-              int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
-              int incCellTagsLen = kv.getTagsLength();
-              Cell newKV = new KeyValue(row.length, family.getKey().length, q.length, ts,
-                  KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
-              System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length);
-              System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(),
-                  family.getKey().length);
-              System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length);
-              // copy in the value
-              System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length);
-              // copy tags
-              if (oldCellTagsLen > 0) {
-                System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(),
-                    newKV.getTagsOffset(), oldCellTagsLen);
-              }
-              if (incCellTagsLen > 0) {
-                System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
-                    newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
+
+              // Add the TTL tag if the mutation carried one
+              if (increment.getTTL() != Long.MAX_VALUE) {
+                newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
               }
+
+              Cell newKV = new KeyValue(row, 0, row.length,
+                family.getKey(), 0, family.getKey().length,
+                q, 0, q.length,
+                ts,
+                KeyValue.Type.Put,
+                val, 0, val.length,
+                newTags);
+
               CellUtil.setSequenceId(newKV, mvccNum);
+
               // Give coprocessors a chance to update the new cell
               if (coprocessorHost != null) {
                 newKV = coprocessorHost.postMutationBeforeWAL(

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 3811142..98b79fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.conf.ConfigurationManager;
@@ -1674,8 +1676,38 @@ public class HStore implements Store {
     return wantedVersions > maxVersions ? maxVersions: wantedVersions;
   }
 
-  static boolean isExpired(final Cell key, final long oldestTimestamp) {
-    return key.getTimestamp() < oldestTimestamp;
+  /**
+   * @param cell
+   * @param oldestTimestamp
+   * @return true if the cell is expired
+   */
+  static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
+    // Do not create an Iterator or Tag objects unless the cell actually has
+    // tags
+    if (cell.getTagsLength() > 0) {
+      // Look for a TTL tag first. Use it instead of the family setting if
+      // found. If a cell has multiple TTLs, resolve the conflict by using the
+      // first tag encountered.
+      Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+        cell.getTagsLength());
+      while (i.hasNext()) {
+        Tag t = i.next();
+        if (TagType.TTL_TAG_TYPE == t.getType()) {
+          // Unlike in schema cell TTLs are stored in milliseconds, no need
+          // to convert
+          long ts = cell.getTimestamp();
+          assert t.getTagLength() == Bytes.SIZEOF_LONG;
+          long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
+          if (ts + ttl < now) {
+            return true;
+          }
+          // Per cell TTLs cannot extend lifetime beyond family settings, so
+          // fall through to check that
+          break;
+        }
+      }
+    }
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
index 56e6b8a..c46da2a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
@@ -102,6 +102,10 @@ public class ScanQueryMatcher {
   private final long earliestPutTs;
   private final long ttl;
 
+  /** The oldest timestamp we are interested in, based on TTL */
+  private final long oldestUnexpiredTS;
+  private final long now;
+
   /** readPoint over which the KVs are unconditionally included */
   protected long maxReadPointToTrackVersions;
 
@@ -154,7 +158,7 @@ public class ScanQueryMatcher {
    */
   public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
       ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
-      RegionCoprocessorHost regionCoprocessorHost) throws IOException {
+      long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
     this.tr = scan.getTimeRange();
     this.rowComparator = scanInfo.getComparator();
     this.regionCoprocessorHost = regionCoprocessorHost;
@@ -164,6 +168,9 @@ public class ScanQueryMatcher {
         scanInfo.getFamily());
     this.filter = scan.getFilter();
     this.earliestPutTs = earliestPutTs;
+    this.oldestUnexpiredTS = oldestUnexpiredTS;
+    this.now = now;
+
     this.maxReadPointToTrackVersions = readPointToUse;
     this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
     this.ttl = oldestUnexpiredTS;
@@ -218,18 +225,18 @@ public class ScanQueryMatcher {
    * @param scanInfo The store's immutable scan info
    * @param columns
    * @param earliestPutTs Earliest put seen in any of the store files.
-   * @param oldestUnexpiredTS the oldest timestamp we are interested in,
-   *  based on TTL
+   * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
+   * @param now the current server time
    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
    * @param regionCoprocessorHost 
    * @throws IOException 
    */
   public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
-      long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, byte[] dropDeletesFromRow,
+      long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow,
       byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
     this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
-        oldestUnexpiredTS, regionCoprocessorHost);
+        oldestUnexpiredTS, now, regionCoprocessorHost);
     Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
     this.dropDeletesFromRow = dropDeletesFromRow;
     this.dropDeletesToRow = dropDeletesToRow;
@@ -239,10 +246,10 @@ public class ScanQueryMatcher {
    * Constructor for tests
    */
   ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
-      NavigableSet<byte[]> columns, long oldestUnexpiredTS) throws IOException {
+      NavigableSet<byte[]> columns, long oldestUnexpiredTS, long now) throws IOException {
     this(scan, scanInfo, columns, ScanType.USER_SCAN,
           Long.MAX_VALUE, /* max Readpoint to track versions */
-        HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, null);
+        HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null);
   }
 
   /**
@@ -300,12 +307,17 @@ public class ScanQueryMatcher {
 
     int qualifierOffset = cell.getQualifierOffset();
     int qualifierLength = cell.getQualifierLength();
+
     long timestamp = cell.getTimestamp();
     // check for early out based on timestamp alone
     if (columns.isDone(timestamp)) {
       return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset,
           qualifierLength);
     }
+    // check if the cell is expired by cell TTL
+    if (HStore.isCellTTLExpired(cell, this.oldestUnexpiredTS, this.now)) {
+      return MatchCode.SKIP;
+    }    
 
     /*
      * The delete logic is pretty complicated now.

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index ab7370c..9db116e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -77,6 +77,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   protected final Scan scan;
   protected final NavigableSet<byte[]> columns;
   protected final long oldestUnexpiredTS;
+  protected final long now;
   protected final int minVersions;
   protected final long maxRowSize;
 
@@ -123,7 +124,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     explicitColumnQuery = numCol > 0;
     this.scan = scan;
     this.columns = columns;
-    oldestUnexpiredTS = EnvironmentEdgeManager.currentTime() - ttl;
+    this.now = EnvironmentEdgeManager.currentTime();
+    this.oldestUnexpiredTS = now - ttl;
     this.minVersions = minVersions;
 
     if (store != null && ((HStore)store).getHRegion() != null
@@ -176,7 +178,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     }
     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
-        oldestUnexpiredTS, store.getCoprocessorHost());
+        oldestUnexpiredTS, now, store.getCoprocessorHost());
 
     this.store.addChangedReaderObserver(this);
 
@@ -241,10 +243,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
     if (dropDeletesFromRow == null) {
       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
-          earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost());
+          earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
     } else {
       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
-          oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
+          oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
     }
 
     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
@@ -284,7 +286,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
         scanInfo.getMinVersions(), readPt);
     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
-        Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null);
+        Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
 
     // In unit tests, the store could be null
     if (this.store != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
new file mode 100644
index 0000000..a5cceb0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestImportTSVWithTTLs implements Configurable {
+
+  protected static final Log LOG = LogFactory.getLog(TestImportTSVWithTTLs.class);
+  protected static final String NAME = TestImportTsv.class.getSimpleName();
+  protected static HBaseTestingUtility util = new HBaseTestingUtility();
+
+  /**
+   * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
+   * false.
+   */
+  protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
+
+  /**
+   * Force use of combiner in doMROnTableTest. Boolean. Default is true.
+   */
+  protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
+
+  private final String FAMILY = "FAM";
+  private static Configuration conf;
+
+  @Override
+  public Configuration getConf() {
+    return util.getConfiguration();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    throw new IllegalArgumentException("setConf not supported");
+  }
+
+  @BeforeClass
+  public static void provisionCluster() throws Exception {
+    conf = util.getConfiguration();
+    // We don't check persistence in HFiles in this test, but if we ever do we will
+    // need this where the default hfile version is not 3 (i.e. 0.98)
+    conf.setInt("hfile.format.version", 3);
+    conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName());
+    util.startMiniCluster();
+    util.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void releaseCluster() throws Exception {
+    util.shutdownMiniMapReduceCluster();
+    util.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testMROnTable() throws Exception {
+    String tableName = "test-" + UUID.randomUUID();
+
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.MAPPER_CONF_KEY
+            + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName };
+    String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n";
+    util.createTable(TableName.valueOf(tableName), FAMILY);
+    doMROnTableTest(util, FAMILY, data, args, 1);
+    util.deleteTable(tableName);
+  }
+
+  protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
+      String[] args, int valueMultiplier) throws Exception {
+    TableName table = TableName.valueOf(args[args.length - 1]);
+    Configuration conf = new Configuration(util.getConfiguration());
+
+    // populate input file
+    FileSystem fs = FileSystem.get(conf);
+    Path inputPath = fs.makeQualified(new Path(util
+        .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
+    FSDataOutputStream op = fs.create(inputPath, true);
+    op.write(Bytes.toBytes(data));
+    op.close();
+    LOG.debug(String.format("Wrote test data to file: %s", inputPath));
+
+    if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
+      LOG.debug("Forcing combiner.");
+      conf.setInt("mapreduce.map.combine.minspills", 1);
+    }
+
+    // run the import
+    List<String> argv = new ArrayList<String>(Arrays.asList(args));
+    argv.add(inputPath.toString());
+    Tool tool = new ImportTsv();
+    LOG.debug("Running ImportTsv with arguments: " + argv);
+    try {
+      // Job will fail if observer rejects entries without TTL
+      assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
+    } finally {
+      // Clean up
+      if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
+        LOG.debug("Deleting test subdirectory");
+        util.cleanupDataTestDirOnTestFS(table.getNameAsString());
+      }
+    }
+
+    return tool;
+  }
+
+  public static class TTLCheckingObserver extends BaseRegionObserver {
+
+    @Override
+    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
+        Durability durability) throws IOException {
+      HRegion region = e.getEnvironment().getRegion();
+      if (!region.getRegionInfo().isMetaTable()
+          && !region.getRegionInfo().getTable().isSystemTable()) {
+        // The put carries the TTL attribute
+        if (put.getTTL() != Long.MAX_VALUE) {
+          return;
+        }
+        throw new IOException("Operation does not have TTL set");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index ffaa347..7c06eca 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -90,6 +91,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
@@ -111,6 +113,7 @@ import org.apache.hadoop.hbase.filter.NullComparator;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -140,6 +143,7 @@ import org.apache.hadoop.hbase.test.MetricsAssertHelper;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
@@ -5767,6 +5771,133 @@ public class TestHRegion {
     }
   }
 
+  @Test
+  public void testCellTTLs() throws IOException {
+    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(edge);
+
+    final byte[] row = Bytes.toBytes("testRow");
+    final byte[] q1 = Bytes.toBytes("q1");
+    final byte[] q2 = Bytes.toBytes("q2");
+    final byte[] q3 = Bytes.toBytes("q3");
+    final byte[] q4 = Bytes.toBytes("q4");
+
+    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testCellTTLs"));
+    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
+    hcd.setTimeToLive(10); // 10 seconds
+    htd.addFamily(hcd);
+
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
+
+    HRegion region = HRegion.createHRegion(new HRegionInfo(htd.getTableName(),
+        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
+      TEST_UTIL.getDataTestDir(), conf, htd);
+    assertNotNull(region);
+    try {
+      long now = EnvironmentEdgeManager.currentTime();
+      // Add a cell that will expire in 5 seconds via cell TTL
+      region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
+        HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+          // TTL tags specify ts in milliseconds
+          new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+      // Add a cell that will expire after 10 seconds via family setting
+      region.put(new Put(row).add(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
+      // Add a cell that will expire in 15 seconds via cell TTL
+      region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
+        HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+          // TTL tags specify ts in milliseconds
+          new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+      // Add a cell that will expire in 20 seconds via family setting
+      region.put(new Put(row).add(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
+
+      // Flush so we are sure store scanning gets this right
+      region.flushcache();
+
+      // A query at time T+0 should return all cells
+      Result r = region.get(new Get(row));
+      assertNotNull(r.getValue(fam1, q1));
+      assertNotNull(r.getValue(fam1, q2));
+      assertNotNull(r.getValue(fam1, q3));
+      assertNotNull(r.getValue(fam1, q4));
+
+      // Increment time to T+5 seconds
+      edge.incrementTime(5000);
+
+      r = region.get(new Get(row));
+      assertNull(r.getValue(fam1, q1));
+      assertNotNull(r.getValue(fam1, q2));
+      assertNotNull(r.getValue(fam1, q3));
+      assertNotNull(r.getValue(fam1, q4));
+
+      // Increment time to T+10 seconds
+      edge.incrementTime(5000);
+
+      r = region.get(new Get(row));
+      assertNull(r.getValue(fam1, q1));
+      assertNull(r.getValue(fam1, q2));
+      assertNotNull(r.getValue(fam1, q3));
+      assertNotNull(r.getValue(fam1, q4));
+
+      // Increment time to T+15 seconds
+      edge.incrementTime(5000);
+
+      r = region.get(new Get(row));
+      assertNull(r.getValue(fam1, q1));
+      assertNull(r.getValue(fam1, q2));
+      assertNull(r.getValue(fam1, q3));
+      assertNotNull(r.getValue(fam1, q4));
+
+      // Increment time to T+20 seconds
+      edge.incrementTime(10000);
+
+      r = region.get(new Get(row));
+      assertNull(r.getValue(fam1, q1));
+      assertNull(r.getValue(fam1, q2));
+      assertNull(r.getValue(fam1, q3));
+      assertNull(r.getValue(fam1, q4));
+
+      // Fun with disappearing increments
+
+      // Start at 1
+      region.put(new Put(row).add(fam1, q1, Bytes.toBytes(1L)));
+      r = region.get(new Get(row));
+      byte[] val = r.getValue(fam1, q1);
+      assertNotNull(val);
+      assertEquals(Bytes.toLong(val), 1L);
+
+      // Increment with a TTL of 5 seconds
+      Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
+      incr.setTTL(5000);
+      region.increment(incr); // 2
+
+      // New value should be 2
+      r = region.get(new Get(row));
+      val = r.getValue(fam1, q1);
+      assertNotNull(val);
+      assertEquals(Bytes.toLong(val), 2L);
+
+      // Increment time to T+25 seconds
+      edge.incrementTime(5000);
+
+      // Value should be back to 1
+      r = region.get(new Get(row));
+      val = r.getValue(fam1, q1);
+      assertNotNull(val);
+      assertEquals(Bytes.toLong(val), 1L);
+
+      // Increment time to T+30 seconds
+      edge.incrementTime(5000);
+
+      // Original value written at T+20 should be gone now via family TTL
+      r = region.get(new Get(row));
+      assertNull(r.getValue(fam1, q1));
+
+    } finally {
+      HRegion.closeHRegion(region);
+    }
+  }
+
   private static HRegion initHRegion(byte[] tableName, String callingMethod,
       byte[]... families) throws IOException {
     return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
index 97c66e4..6476288 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
@@ -95,11 +95,12 @@ public class TestQueryMatcher extends HBaseTestCase {
 
   }
 
-    private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
-    // 2,4,5    
+  private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
+    long now = EnvironmentEdgeManager.currentTime();
+    // 2,4,5
     ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
         0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2),
-        EnvironmentEdgeManager.currentTime() - ttl);
+        now - ttl, now);
 
     List<KeyValue> memstore = new ArrayList<KeyValue>();
     memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -182,9 +183,10 @@ public class TestQueryMatcher extends HBaseTestCase {
     expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
     expected.add(ScanQueryMatcher.MatchCode.DONE);
 
+    long now = EnvironmentEdgeManager.currentTime();
     ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
         0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
-        EnvironmentEdgeManager.currentTime() - ttl);
+        now - ttl, now);
 
     List<KeyValue> memstore = new ArrayList<KeyValue>();
     memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -238,8 +240,8 @@ public class TestQueryMatcher extends HBaseTestCase {
 
     long now = EnvironmentEdgeManager.currentTime();
     ScanQueryMatcher qm =
-        new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
-            rowComparator), get.getFamilyMap().get(fam2), now - testTTL);
+      new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
+        rowComparator), get.getFamilyMap().get(fam2), now - testTTL, now);
 
     KeyValue [] kvs = new KeyValue[] {
         new KeyValue(row1, fam2, col1, now-100, data),
@@ -294,7 +296,7 @@ public class TestQueryMatcher extends HBaseTestCase {
     long now = EnvironmentEdgeManager.currentTime();
     ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
         0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
-        now - testTTL);
+        now - testTTL, now);
 
     KeyValue [] kvs = new KeyValue[] {
         new KeyValue(row1, fam2, col1, now-100, data),
@@ -353,7 +355,7 @@ public class TestQueryMatcher extends HBaseTestCase {
     NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
 
     ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
-        HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to, null);
+        HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, now, from, to, null);
     List<ScanQueryMatcher.MatchCode> actual =
         new ArrayList<ScanQueryMatcher.MatchCode>(rows.length);
     byte[] prevRow = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
index daee137..c0dcee6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.testclassification.SmallTests;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
 @Category({RegionServerTests.class, SmallTests.class})
@@ -36,7 +35,6 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
 
   final static int VERSIONS = 2;
 
-  @Test
   public void testCheckColumn_Ok() throws IOException {
     ScanWildcardColumnTracker tracker =
       new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
@@ -70,7 +68,6 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
     }
   }
 
-  @Test
   public void testCheckColumn_EnforceVersions() throws IOException {
     ScanWildcardColumnTracker tracker =
       new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
index b982977..eaea83e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
@@ -419,8 +419,13 @@ public class TestTags {
       tags = TestCoprocessorForTags.tags;
       assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
       assertEquals(2, tags.size());
-      assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
-      assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+      // We cannot assume the ordering of tags
+      List<String> tagValues = new ArrayList<String>();
+      for (Tag tag: tags) {
+        tagValues.add(Bytes.toString(tag.getValue()));
+      }
+      assertTrue(tagValues.contains("tag1"));
+      assertTrue(tagValues.contains("tag2"));
       TestCoprocessorForTags.checkTagPresence = false;
       TestCoprocessorForTags.tags = null;
 
@@ -476,8 +481,13 @@ public class TestTags {
       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
       tags = TestCoprocessorForTags.tags;
       assertEquals(2, tags.size());
-      assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
-      assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+      // We cannot assume the ordering of tags
+      tagValues.clear();
+      for (Tag tag: tags) {
+        tagValues.add(Bytes.toString(tag.getValue()));
+      }
+      assertTrue(tagValues.contains("tag1"));
+      assertTrue(tagValues.contains("tag2"));
       TestCoprocessorForTags.checkTagPresence = false;
       TestCoprocessorForTags.tags = null;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-shell/src/main/ruby/hbase/table.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/table.rb b/hbase-shell/src/main/ruby/hbase/table.rb
index 46aca19..d53a82e 100644
--- a/hbase-shell/src/main/ruby/hbase/table.rb
+++ b/hbase-shell/src/main/ruby/hbase/table.rb
@@ -141,17 +141,19 @@ EOF
          set_attributes(p, attributes) if attributes
          visibility = args[VISIBILITY]
          set_cell_visibility(p, visibility) if visibility
+         ttl = args[TTL]
+         set_op_ttl(p, ttl) if ttl
       end
       #Case where attributes are specified without timestamp
       if timestamp.kind_of?(Hash)
       	timestamp.each do |k, v|
-      	  if v.kind_of?(Hash)
-      	  	set_attributes(p, v) if v
-      	  end
-      	  if v.kind_of?(String)
-      	  	set_cell_visibility(p, v) if v
-      	  end
-      	  
+          if k == 'ATTRIBUTES'
+            set_attributes(p, v)
+          elsif k == 'VISIBILITY'
+            set_cell_visibility(p, v)
+          elsif k == "TTL"
+            set_op_ttl(p, v)
+          end
         end
         timestamp = nil
       end  
@@ -219,6 +221,8 @@ EOF
       	visibility = args[VISIBILITY]
         set_attributes(incr, attributes) if attributes
         set_cell_visibility(incr, visibility) if visibility
+        ttl = args[TTL]
+        set_op_ttl(incr, ttl) if ttl
       end
       incr.addColumn(family, qualifier, value)
       @table.increment(incr)
@@ -237,6 +241,8 @@ EOF
       	visibility = args[VISIBILITY]
         set_attributes(append, attributes) if attributes
         set_cell_visibility(append, visibility) if visibility
+        ttl = args[TTL]
+        set_op_ttl(append, ttl) if ttl
       end
       append.add(family, qualifier, value.to_s.to_java_bytes)
       @table.append(append)
@@ -545,6 +551,10 @@ EOF
           auths.to_java(:string)))
     end
 
+    def set_op_ttl(op, ttl)
+      op.setTTL(ttl.to_java(:long))
+    end
+
     #----------------------------
     # Add general administration utilities to the shell
     # each of the names below adds this method name to the table

http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-shell/src/test/ruby/hbase/table_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/table_test.rb b/hbase-shell/src/test/ruby/hbase/table_test.rb
index 7272229..fa2990d 100644
--- a/hbase-shell/src/test/ruby/hbase/table_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/table_test.rb
@@ -530,5 +530,18 @@ module Hbase
       end
     end
 
+    define_test "mutation with TTL should expire" do
+      @test_table.put('ttlTest', 'x:a', 'foo', { TTL => 1000 } )
+      begin
+        res = @test_table._get_internal('ttlTest', 'x:a')
+        assert_not_nil(res)
+        sleep 2
+        res = @test_table._get_internal('ttlTest', 'x:a')
+        assert_nil(res)
+      ensure
+        @test_table.delete('ttlTest', 'x:a')
+      end
+    end
+
   end
 end


[2/3] hbase git commit: HBASE-10560 Per cell TTLs

Posted by ap...@apache.org.
HBASE-10560 Per cell TTLs


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/004e977b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/004e977b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/004e977b

Branch: refs/heads/branch-1
Commit: 004e977ba0009fc8978d842eb65933a62fdf1173
Parents: 1bd27bf
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Dec 5 11:00:19 2014 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Dec 5 11:11:15 2014 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/Append.java  |   5 +
 .../org/apache/hadoop/hbase/client/Delete.java  |   5 +
 .../apache/hadoop/hbase/client/Increment.java   |   5 +
 .../apache/hadoop/hbase/client/Mutation.java    |  34 +++
 .../org/apache/hadoop/hbase/client/Put.java     |   5 +
 .../java/org/apache/hadoop/hbase/TagType.java   |   1 +
 .../hadoop/hbase/mapreduce/CellCreator.java     |  35 ++-
 .../hadoop/hbase/mapreduce/ImportTsv.java       |  55 ++++-
 .../hadoop/hbase/mapreduce/TextSortReducer.java |  24 +-
 .../hbase/mapreduce/TsvImporterMapper.java      |  26 ++-
 .../GetClosestRowBeforeTracker.java             |  25 ++-
 .../hadoop/hbase/regionserver/HRegion.java      | 224 ++++++++++++++-----
 .../hadoop/hbase/regionserver/HStore.java       |  36 ++-
 .../hbase/regionserver/ScanQueryMatcher.java    |  26 ++-
 .../hadoop/hbase/regionserver/StoreScanner.java |  12 +-
 .../hbase/mapreduce/TestImportTSVWithTTLs.java  | 172 ++++++++++++++
 .../hadoop/hbase/regionserver/TestHRegion.java  | 131 +++++++++++
 .../hbase/regionserver/TestQueryMatcher.java    |  18 +-
 .../hadoop/hbase/regionserver/TestTags.java     |  18 +-
 hbase-shell/src/main/ruby/hbase/table.rb        |  24 +-
 hbase-shell/src/test/ruby/hbase/table_test.rb   |  13 ++
 21 files changed, 792 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
index 66e8d32..58c204b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
@@ -183,4 +183,9 @@ public class Append extends Mutation {
   public Append setACL(Map<String, Permission> perms) {
     return (Append) super.setACL(perms);
   }
+
+  @Override
+  public Append setTTL(long ttl) {
+    return (Append) super.setTTL(ttl);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
index 7447147..4bbcb27 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
@@ -473,4 +473,9 @@ public class Delete extends Mutation implements Comparable<Row> {
   public Delete setACL(Map<String, Permission> perms) {
     return (Delete) super.setACL(perms);
   }
+
+  @Override
+  public Delete setTTL(long ttl) {
+    throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
index e2d8387..af0ea56 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
@@ -329,4 +329,9 @@ public class Increment extends Mutation implements Comparable<Row> {
   public Increment setACL(Map<String, Permission> perms) {
     return (Increment) super.setACL(perms);
   }
+
+  @Override
+  public Increment setTTL(long ttl) {
+    return (Increment) super.setTTL(ttl);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index 1ebff93..dbc1317 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -78,6 +78,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
    */
   private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
 
+  /**
+   * The attribute for storing TTL for the result of the mutation.
+   */
+  private static final String OP_ATTRIBUTE_TTL = "_ttl";
+
   protected byte [] row = null;
   protected long ts = HConstants.LATEST_TIMESTAMP;
   protected Durability durability = Durability.USE_DEFAULT;
@@ -201,6 +206,12 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
     if (getId() != null) {
       map.put("id", getId());
     }
+    // Add the TTL if set
+    // Long.MAX_VALUE is the default, and is interpreted to mean this attribute
+    // has not been set.
+    if (getTTL() != Long.MAX_VALUE) {
+      map.put("ttl", getTTL());
+    }
     return map;
   }
 
@@ -474,6 +485,29 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
   }
 
   /**
+   * Return the TTL requested for the result of the mutation, in milliseconds.
+   * @return the TTL requested for the result of the mutation, in milliseconds,
+   * or Long.MAX_VALUE if unset
+   */
+  public long getTTL() {
+    byte[] ttlBytes = getAttribute(OP_ATTRIBUTE_TTL);
+    if (ttlBytes != null) {
+      return Bytes.toLong(ttlBytes);
+    }
+    return Long.MAX_VALUE;
+  }
+
+  /**
+   * Set the TTL desired for the result of the mutation, in milliseconds.
+   * @param ttl the TTL desired for the result of the mutation, in milliseconds
+   * @return this
+   */
+  public Mutation setTTL(long ttl) {
+    setAttribute(OP_ATTRIBUTE_TTL, Bytes.toBytes(ttl));
+    return this;
+  }
+
+  /**
    * Subclasses should override this method to add the heap size of their own fields.
    * @return the heap size to add (will be aligned).
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
index 20d445a..0fb0118 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
@@ -472,4 +472,9 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
   public Put setACL(Map<String, Permission> perms) {
     return (Put) super.setACL(perms);
   }
+
+  @Override
+  public Put setTTL(long ttl) {
+    return (Put) super.setTTL(ttl);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index f088dcf..2095b7a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -30,4 +30,5 @@ public final class TagType {
   public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
   // String based tag type used in replication
   public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
+  public static final byte TTL_TAG_TYPE = (byte)8;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
index b3dfee7..001f64d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
@@ -69,7 +69,7 @@ public class CellCreator {
       byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
       int vlength) throws IOException {
     return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength,
-        timestamp, value, voffset, vlength, null);
+        timestamp, value, voffset, vlength, (List<Tag>)null);
   }
 
   /**
@@ -90,6 +90,7 @@ public class CellCreator {
    * @return created Cell
    * @throws IOException
    */
+  @Deprecated
   public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
       byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
       int vlength, String visExpression) throws IOException {
@@ -100,4 +101,36 @@ public class CellCreator {
     return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
         qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags);
   }
+
+  /**
+   * @param row row key
+   * @param roffset row offset
+   * @param rlength row length
+   * @param family family name
+   * @param foffset family offset
+   * @param flength family length
+   * @param qualifier column qualifier
+   * @param qoffset qualifier offset
+   * @param qlength qualifier length
+   * @param timestamp version timestamp
+   * @param value column value
+   * @param voffset value offset
+   * @param vlength value length
+   * @param tags
+   * @return created Cell
+   * @throws IOException
+   */
+  public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
+      byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
+      int vlength, List<Tag> tags) throws IOException {
+    return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
+        qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags);
+  }
+
+  /**
+   * @return Visibility expression resolver
+   */
+  public VisibilityExpressionResolver getVisibilityExpressionResolver() {
+    return this.visExpResolver;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
index 7ae6599..54e0034 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
@@ -125,13 +125,20 @@ public class ImportTsv extends Configured implements Tool {
 
     public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY";
 
+    public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL";
+
     private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
 
     public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
 
     public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1;
 
+    public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1;
+
     private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+
+    private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX;
+
     /**
      * @param columnsSpecification the list of columns to parser out, comma separated.
      * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
@@ -162,14 +169,18 @@ public class ImportTsv extends Configured implements Tool {
           timestampKeyColumnIndex = i;
           continue;
         }
-        if(ATTRIBUTES_COLUMN_SPEC.equals(str)) {
+        if (ATTRIBUTES_COLUMN_SPEC.equals(str)) {
           attrKeyColumnIndex = i;
           continue;
         }
-        if(CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
+        if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
           cellVisibilityColumnIndex = i;
           continue;
         }
+        if (CELL_TTL_COLUMN_SPEC.equals(str)) {
+          cellTTLColumnIndex = i;
+          continue;
+        }
         String[] parts = str.split(":", 2);
         if (parts.length == 1) {
           families[i] = str.getBytes();
@@ -197,6 +208,10 @@ public class ImportTsv extends Configured implements Tool {
       return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
     }
 
+    public boolean hasCellTTL() {
+      return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+    }
+
     public int getAttributesKeyColumnIndex() {
       return attrKeyColumnIndex;
     }
@@ -204,9 +219,15 @@ public class ImportTsv extends Configured implements Tool {
     public int getCellVisibilityColumnIndex() {
       return cellVisibilityColumnIndex;
     }
+
+    public int getCellTTLColumnIndex() {
+      return cellTTLColumnIndex;
+    }
+
     public int getRowKeyColumnIndex() {
       return rowKeyColumnIndex;
     }
+
     public byte[] getFamily(int idx) {
       return families[idx];
     }
@@ -238,8 +259,10 @@ public class ImportTsv extends Configured implements Tool {
         throw new BadTsvLineException("No timestamp");
       } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
         throw new BadTsvLineException("No attributes specified");
-      } else if(hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
+      } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
         throw new BadTsvLineException("No cell visibility specified");
+      } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) {
+        throw new BadTsvLineException("No cell TTL specified");
       }
       return new ParsedLine(tabOffsets, lineBytes);
     }
@@ -336,6 +359,31 @@ public class ImportTsv extends Configured implements Tool {
         }
       }
 
+      public int getCellTTLColumnOffset() {
+        if (hasCellTTL()) {
+          return getColumnOffset(cellTTLColumnIndex);
+        } else {
+          return DEFAULT_CELL_TTL_COLUMN_INDEX;
+        }
+      }
+
+      public int getCellTTLColumnLength() {
+        if (hasCellTTL()) {
+          return getColumnLength(cellTTLColumnIndex);
+        } else {
+          return DEFAULT_CELL_TTL_COLUMN_INDEX;
+        }
+      }
+
+      public long getCellTTL() {
+        if (!hasCellTTL()) {
+          return 0;
+        } else {
+          return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex),
+              getColumnLength(cellTTLColumnIndex));
+        }
+      }
+
       public int getColumnOffset(int idx) {
         if (idx > 0)
           return tabOffsets.get(idx - 1) + 1;
@@ -535,6 +583,7 @@ public class ImportTsv extends Configured implements Tool {
       if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
           || TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn)
           || TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn)
+          || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn)
           || TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn))
         continue;
       // we are only concerned with the first one (in case this is a cf:cq)

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
index 4a0e0fd..b3981a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
@@ -18,7 +18,9 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -28,8 +30,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -62,6 +67,9 @@ public class TextSortReducer extends
   /** Cell visibility expr **/
   private String cellVisibilityExpr;
 
+  /** Cell TTL */
+  private long ttl;
+
   private CellCreator kvCreator;
 
   public long getTs() {
@@ -148,18 +156,30 @@ public class TextSortReducer extends
           // Retrieve timestamp if exists
           ts = parsed.getTimestamp(ts);
           cellVisibilityExpr = parsed.getCellVisibility();
+          ttl = parsed.getCellTTL();
 
           for (int i = 0; i < parsed.getColumnCount(); i++) {
             if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
-                || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
+                || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
+                || i == parser.getCellTTLColumnIndex()) {
               continue;
             }
             // Creating the KV which needs to be directly written to HFiles. Using the Facade
             // KVCreator for creation of kvs.
+            List<Tag> tags = new ArrayList<Tag>();
+            if (cellVisibilityExpr != null) {
+              tags.addAll(kvCreator.getVisibilityExpressionResolver()
+                .createVisibilityExpTags(cellVisibilityExpr));
+            }
+            // Add TTL directly to the KV so we can vary them when packing more than one KV
+            // into puts
+            if (ttl > 0) {
+              tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
+            }
             Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(),
                 parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
                 parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
-                parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr);
+                parsed.getColumnOffset(i), parsed.getColumnLength(i), tags);
             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
             kvs.add(kv);
             curSize += kv.heapSize();

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
index ff84081..270de75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
@@ -18,17 +18,22 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
 import org.apache.hadoop.hbase.security.visibility.CellVisibility;
 import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Counter;
@@ -59,6 +64,8 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
 
   protected String cellVisibilityExpr;
 
+  protected long ttl;
+
   protected CellCreator kvCreator;
 
   private String hfileOutPath;
@@ -144,11 +151,13 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
       // Retrieve timestamp if exists
       ts = parsed.getTimestamp(ts);
       cellVisibilityExpr = parsed.getCellVisibility();
+      ttl = parsed.getCellTTL();
 
       Put put = new Put(rowKey.copyBytes());
       for (int i = 0; i < parsed.getColumnCount(); i++) {
         if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
-            || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
+            || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
+            || i == parser.getCellTTLColumnIndex()) {
           continue;
         }
         populatePut(lineBytes, parsed, put, i);
@@ -192,13 +201,26 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
         // the validation
         put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
       }
+      if (ttl > 0) {
+        put.setTTL(ttl);
+      }
     } else {
       // Creating the KV which needs to be directly written to HFiles. Using the Facade
       // KVCreator for creation of kvs.
+      List<Tag> tags = new ArrayList<Tag>();
+      if (cellVisibilityExpr != null) {
+        tags.addAll(kvCreator.getVisibilityExpressionResolver()
+          .createVisibilityExpTags(cellVisibilityExpr));
+      }
+      // Add TTL directly to the KV so we can vary them when packing more than one KV
+      // into puts
+      if (ttl > 0) {
+        tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
+      }
       cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
           parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
           parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
-          parsed.getColumnLength(i), cellVisibilityExpr);
+          parsed.getColumnLength(i), tags);
     }
     put.add(cell);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
index 2f9b0be..ae41844 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}.
- * Like {@link ScanDeleteTracker} and {@link ScanDeleteTracker} but does not
+ * Like {@link ScanQueryMatcher} and {@link ScanDeleteTracker} but does not
  * implement the {@link DeleteTracker} interface since state spans rows (There
  * is no update nor reset method).
  */
@@ -42,7 +42,8 @@ import org.apache.hadoop.hbase.util.Bytes;
 class GetClosestRowBeforeTracker {
   private final KeyValue targetkey;
   // Any cell w/ a ts older than this is expired.
-  private final long oldestts;
+  private final long now;
+  private final long oldestUnexpiredTs;
   private Cell candidate = null;
   private final KVComparator kvcomparator;
   // Flag for whether we're doing getclosest on a metaregion.
@@ -75,20 +76,13 @@ class GetClosestRowBeforeTracker {
         HConstants.DELIMITER) - this.rowoffset;
     }
     this.tablenamePlusDelimiterLength = metaregion? l + 1: -1;
-    this.oldestts = System.currentTimeMillis() - ttl;
+    this.now = System.currentTimeMillis();
+    this.oldestUnexpiredTs = now - ttl;
     this.kvcomparator = c;
     KeyValue.RowOnlyComparator rc = new KeyValue.RowOnlyComparator(this.kvcomparator);
     this.deletes = new TreeMap<KeyValue, NavigableSet<KeyValue>>(rc);
   }
 
-  /**
-   * @param kv
-   * @return True if this <code>kv</code> is expired.
-   */
-  boolean isExpired(final Cell kv) {
-    return HStore.isExpired(kv, this.oldestts);
-  }
-
   /*
    * Add the specified KeyValue to the list of deletes.
    * @param kv
@@ -173,6 +167,15 @@ class GetClosestRowBeforeTracker {
     return false;
   }
 
+  /**
+   * @param cell
+   * @return true if the cell is expired
+   */
+  public boolean isExpired(final Cell cell) {
+    return cell.getTimestamp() < this.oldestUnexpiredTs ||
+      HStore.isCellTTLExpired(cell, this.oldestUnexpiredTs, this.now);
+  }
+
   /*
    * Handle keys whose values hold deletes.
    * Add to the set of deletes and then if the candidate keys contain any that

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index ffc9cba..67cd70b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -85,6 +85,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.client.Append;
@@ -2644,6 +2646,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
           noOfDeletes++;
         }
+        rewriteCellTags(familyMaps[i], mutation);
       }
 
       lock(this.updatesLock.readLock(), numReadyToWrite);
@@ -3118,6 +3121,59 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     }
   }
 
+  /**
+   * Possibly rewrite incoming cell tags.
+   */
+  void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
+    // Check if we have any work to do and early out otherwise
+    // Update these checks as more logic is added here
+
+    if (m.getTTL() == Long.MAX_VALUE) {
+      return;
+    }
+
+    // From this point we know we have some work to do
+
+    for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
+      List<Cell> cells = e.getValue();
+      assert cells instanceof RandomAccess;
+      int listSize = cells.size();
+      for (int i = 0; i < listSize; i++) {
+        Cell cell = cells.get(i);
+        List<Tag> newTags = new ArrayList<Tag>();
+        Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
+          cell.getTagsOffset(), cell.getTagsLength());
+
+        // Carry forward existing tags
+
+        while (tagIterator.hasNext()) {
+
+          // Add any filters or tag specific rewrites here
+
+          newTags.add(tagIterator.next());
+        }
+
+        // Cell TTL handling
+
+        // Check again if we need to add a cell TTL because early out logic
+        // above may change when there are more tag based features in core.
+        if (m.getTTL() != Long.MAX_VALUE) {
+          // Add a cell TTL tag
+          newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
+        }
+
+        // Rewrite the cell with the updated set of tags
+
+        cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+          cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+          cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+          cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
+          cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+          newTags));
+      }
+    }
+  }
+
   /*
    * Check if resources to support an update.
    *
@@ -5215,6 +5271,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
           processor.preBatchMutate(this, walEdit);
           // 7. Apply to memstore
           for (Mutation m : mutations) {
+            // Handle any tag based cell features
+            rewriteCellTags(m.getFamilyCellMap(), m);
+
             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
               Cell cell = cellScanner.current();
               CellUtil.setSequenceId(cell, mvccNum);
@@ -5353,8 +5412,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
   }
 
-  // TODO: There's a lot of boiler plate code identical
-  // to increment... See how to better unify that.
+  // TODO: There's a lot of boiler plate code identical to increment.
+  // We should refactor append and increment as local get-mutate-put
+  // transactions, so all stores only go through one code path for puts.
   /**
    * Perform one or more append operations on a row.
    *
@@ -5425,8 +5485,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             // Iterate the input columns and update existing values if they were
             // found, otherwise add new column initialized to the append value
 
-            // Avoid as much copying as possible. Every byte is copied at most
-            // once.
+            // Avoid as much copying as possible. We may need to rewrite and
+            // consolidate tags. Bytes are only copied once.
             // Would be nice if KeyValue had scatter/gather logic
             int idx = 0;
             for (Cell cell : family.getValue()) {
@@ -5436,40 +5496,87 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
                   && CellUtil.matchingQualifier(results.get(idx), cell)) {
                 oldCell = results.get(idx);
                 long ts = Math.max(now, oldCell.getTimestamp());
-                // allocate an empty kv once
+
+                // Process cell tags
+                List<Tag> newTags = new ArrayList<Tag>();
+
+                // Make a union of the set of tags in the old and new KVs
+
+                if (oldCell.getTagsLength() > 0) {
+                  Iterator<Tag> i = CellUtil.tagsIterator(oldCell.getTagsArray(),
+                    oldCell.getTagsOffset(), oldCell.getTagsLength());
+                  while (i.hasNext()) {
+                    newTags.add(i.next());
+                  }
+                }
+                if (cell.getTagsLength() > 0) {
+                  Iterator<Tag> i  = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+                    cell.getTagsLength());
+                  while (i.hasNext()) {
+                    newTags.add(i.next());
+                  }
+                }
+
+                // Cell TTL handling
+
+                if (append.getTTL() != Long.MAX_VALUE) {
+                  // Add the new TTL tag
+                  newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+                }
+
+                // Rebuild tags
+                byte[] tagBytes = Tag.fromList(newTags);
+
+                // allocate an empty cell once
                 newCell = new KeyValue(row.length, cell.getFamilyLength(),
                     cell.getQualifierLength(), ts, KeyValue.Type.Put,
                     oldCell.getValueLength() + cell.getValueLength(),
-                    oldCell.getTagsLength() + cell.getTagsLength());
-                // copy in the value
-                System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
-                    newCell.getValueArray(), newCell.getValueOffset(),
-                    oldCell.getValueLength());
-                System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
-                    newCell.getValueArray(),
-                    newCell.getValueOffset() + oldCell.getValueLength(),
-                    cell.getValueLength());
-                // copy in the tags
-                System.arraycopy(oldCell.getTagsArray(), oldCell.getTagsOffset(),
-                    newCell.getTagsArray(), newCell.getTagsOffset(), oldCell.getTagsLength());
-                System.arraycopy(cell.getTagsArray(), cell.getTagsOffset(), newCell.getTagsArray(),
-                    newCell.getTagsOffset() + oldCell.getTagsLength(), cell.getTagsLength());
+                    tagBytes.length);
                 // copy in row, family, and qualifier
                 System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
-                    newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
+                  newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
                 System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
-                    newCell.getFamilyArray(), newCell.getFamilyOffset(),
-                    cell.getFamilyLength());
+                  newCell.getFamilyArray(), newCell.getFamilyOffset(),
+                  cell.getFamilyLength());
                 System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
-                    newCell.getQualifierArray(), newCell.getQualifierOffset(),
-                    cell.getQualifierLength());
+                  newCell.getQualifierArray(), newCell.getQualifierOffset(),
+                  cell.getQualifierLength());
+                // copy in the value
+                System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
+                  newCell.getValueArray(), newCell.getValueOffset(),
+                  oldCell.getValueLength());
+                System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
+                  newCell.getValueArray(),
+                  newCell.getValueOffset() + oldCell.getValueLength(),
+                  cell.getValueLength());
+                // Copy in tag data
+                System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
+                  tagBytes.length);
                 idx++;
               } else {
-                // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
-                // so only need to update the timestamp to 'now'
+                // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP
                 CellUtil.updateLatestStamp(cell, now);
-                newCell = cell;
-             }
+
+                // Cell TTL handling
+
+                if (append.getTTL() != Long.MAX_VALUE) {
+                  List<Tag> newTags = new ArrayList<Tag>(1);
+                  newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+                  // Add the new TTL tag
+                  newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
+                      cell.getRowLength(),
+                    cell.getFamilyArray(), cell.getFamilyOffset(),
+                      cell.getFamilyLength(),
+                    cell.getQualifierArray(), cell.getQualifierOffset(),
+                      cell.getQualifierLength(),
+                    cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
+                    cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+                    newTags);
+                } else {
+                  newCell = cell;
+                }
+              }
+
               CellUtil.setSequenceId(newCell, mvccNum);
               // Give coprocessors a chance to update the new cell
               if (coprocessorHost != null) {
@@ -5573,6 +5680,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
   }
 
+  // TODO: There's a lot of boiler plate code identical to append.
+  // We should refactor append and increment as local get-mutate-put
+  // transactions, so all stores only go through one code path for puts.
   /**
    * Perform one or more increment operations on a row.
    * @return new keyvalues after increment
@@ -5645,13 +5755,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
             // Iterate the input columns and update existing values if they were
             // found, otherwise add new column initialized to the increment amount
             int idx = 0;
-            for (Cell kv: family.getValue()) {
-              long amount = Bytes.toLong(CellUtil.cloneValue(kv));
+            for (Cell cell: family.getValue()) {
+              long amount = Bytes.toLong(CellUtil.cloneValue(cell));
               boolean noWriteBack = (amount == 0);
+              List<Tag> newTags = new ArrayList<Tag>();
+
+              // Carry forward any tags that might have been added by a coprocessor
+              if (cell.getTagsLength() > 0) {
+                Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(),
+                  cell.getTagsOffset(), cell.getTagsLength());
+                while (i.hasNext()) {
+                  newTags.add(i.next());
+                }
+              }
 
               Cell c = null;
               long ts = now;
-              if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
+              if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
                 c = results.get(idx);
                 ts = Math.max(now, c.getTimestamp());
                 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
@@ -5661,32 +5781,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
                   throw new org.apache.hadoop.hbase.DoNotRetryIOException(
                       "Attempted to increment field that isn't 64 bits wide");
                 }
+                // Carry tags forward from previous version
+                if (c.getTagsLength() > 0) {
+                  Iterator<Tag> i = CellUtil.tagsIterator(c.getTagsArray(),
+                    c.getTagsOffset(), c.getTagsLength());
+                  while (i.hasNext()) {
+                    newTags.add(i.next());
+                  }
+                }
                 idx++;
               }
 
               // Append new incremented KeyValue to list
-              byte[] q = CellUtil.cloneQualifier(kv);
+              byte[] q = CellUtil.cloneQualifier(cell);
               byte[] val = Bytes.toBytes(amount);
-              int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
-              int incCellTagsLen = kv.getTagsLength();
-              Cell newKV = new KeyValue(row.length, family.getKey().length, q.length, ts,
-                  KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
-              System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length);
-              System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(),
-                  family.getKey().length);
-              System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length);
-              // copy in the value
-              System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length);
-              // copy tags
-              if (oldCellTagsLen > 0) {
-                System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(),
-                    newKV.getTagsOffset(), oldCellTagsLen);
-              }
-              if (incCellTagsLen > 0) {
-                System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
-                    newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
+
+              // Add the TTL tag if the mutation carried one
+              if (increment.getTTL() != Long.MAX_VALUE) {
+                newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
               }
+
+              Cell newKV = new KeyValue(row, 0, row.length,
+                family.getKey(), 0, family.getKey().length,
+                q, 0, q.length,
+                ts,
+                KeyValue.Type.Put,
+                val, 0, val.length,
+                newTags);
+
               CellUtil.setSequenceId(newKV, mvccNum);
+
               // Give coprocessors a chance to update the new cell
               if (coprocessorHost != null) {
                 newKV = coprocessorHost.postMutationBeforeWAL(

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 2adaaba..b3385d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.conf.ConfigurationManager;
@@ -1673,8 +1675,38 @@ public class HStore implements Store {
     return wantedVersions > maxVersions ? maxVersions: wantedVersions;
   }
 
-  static boolean isExpired(final Cell key, final long oldestTimestamp) {
-    return key.getTimestamp() < oldestTimestamp;
+  /**
+   * @param cell
+   * @param oldestTimestamp
+   * @return true if the cell is expired
+   */
+  static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
+    // Do not create an Iterator or Tag objects unless the cell actually has
+    // tags
+    if (cell.getTagsLength() > 0) {
+      // Look for a TTL tag first. Use it instead of the family setting if
+      // found. If a cell has multiple TTLs, resolve the conflict by using the
+      // first tag encountered.
+      Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+        cell.getTagsLength());
+      while (i.hasNext()) {
+        Tag t = i.next();
+        if (TagType.TTL_TAG_TYPE == t.getType()) {
+          // Unlike in schema cell TTLs are stored in milliseconds, no need
+          // to convert
+          long ts = cell.getTimestamp();
+          assert t.getTagLength() == Bytes.SIZEOF_LONG;
+          long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
+          if (ts + ttl < now) {
+            return true;
+          }
+          // Per cell TTLs cannot extend lifetime beyond family settings, so
+          // fall through to check that
+          break;
+        }
+      }
+    }
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
index 56e6b8a..c46da2a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
@@ -102,6 +102,10 @@ public class ScanQueryMatcher {
   private final long earliestPutTs;
   private final long ttl;
 
+  /** The oldest timestamp we are interested in, based on TTL */
+  private final long oldestUnexpiredTS;
+  private final long now;
+
   /** readPoint over which the KVs are unconditionally included */
   protected long maxReadPointToTrackVersions;
 
@@ -154,7 +158,7 @@ public class ScanQueryMatcher {
    */
   public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
       ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
-      RegionCoprocessorHost regionCoprocessorHost) throws IOException {
+      long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
     this.tr = scan.getTimeRange();
     this.rowComparator = scanInfo.getComparator();
     this.regionCoprocessorHost = regionCoprocessorHost;
@@ -164,6 +168,9 @@ public class ScanQueryMatcher {
         scanInfo.getFamily());
     this.filter = scan.getFilter();
     this.earliestPutTs = earliestPutTs;
+    this.oldestUnexpiredTS = oldestUnexpiredTS;
+    this.now = now;
+
     this.maxReadPointToTrackVersions = readPointToUse;
     this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
     this.ttl = oldestUnexpiredTS;
@@ -218,18 +225,18 @@ public class ScanQueryMatcher {
    * @param scanInfo The store's immutable scan info
    * @param columns
    * @param earliestPutTs Earliest put seen in any of the store files.
-   * @param oldestUnexpiredTS the oldest timestamp we are interested in,
-   *  based on TTL
+   * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
+   * @param now the current server time
    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
    * @param regionCoprocessorHost 
    * @throws IOException 
    */
   public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
-      long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, byte[] dropDeletesFromRow,
+      long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow,
       byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
     this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
-        oldestUnexpiredTS, regionCoprocessorHost);
+        oldestUnexpiredTS, now, regionCoprocessorHost);
     Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
     this.dropDeletesFromRow = dropDeletesFromRow;
     this.dropDeletesToRow = dropDeletesToRow;
@@ -239,10 +246,10 @@ public class ScanQueryMatcher {
    * Constructor for tests
    */
   ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
-      NavigableSet<byte[]> columns, long oldestUnexpiredTS) throws IOException {
+      NavigableSet<byte[]> columns, long oldestUnexpiredTS, long now) throws IOException {
     this(scan, scanInfo, columns, ScanType.USER_SCAN,
           Long.MAX_VALUE, /* max Readpoint to track versions */
-        HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, null);
+        HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null);
   }
 
   /**
@@ -300,12 +307,17 @@ public class ScanQueryMatcher {
 
     int qualifierOffset = cell.getQualifierOffset();
     int qualifierLength = cell.getQualifierLength();
+
     long timestamp = cell.getTimestamp();
     // check for early out based on timestamp alone
     if (columns.isDone(timestamp)) {
       return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset,
           qualifierLength);
     }
+    // check if the cell is expired by cell TTL
+    if (HStore.isCellTTLExpired(cell, this.oldestUnexpiredTS, this.now)) {
+      return MatchCode.SKIP;
+    }    
 
     /*
      * The delete logic is pretty complicated now.

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index ab7370c..9db116e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -77,6 +77,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   protected final Scan scan;
   protected final NavigableSet<byte[]> columns;
   protected final long oldestUnexpiredTS;
+  protected final long now;
   protected final int minVersions;
   protected final long maxRowSize;
 
@@ -123,7 +124,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     explicitColumnQuery = numCol > 0;
     this.scan = scan;
     this.columns = columns;
-    oldestUnexpiredTS = EnvironmentEdgeManager.currentTime() - ttl;
+    this.now = EnvironmentEdgeManager.currentTime();
+    this.oldestUnexpiredTS = now - ttl;
     this.minVersions = minVersions;
 
     if (store != null && ((HStore)store).getHRegion() != null
@@ -176,7 +178,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     }
     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
-        oldestUnexpiredTS, store.getCoprocessorHost());
+        oldestUnexpiredTS, now, store.getCoprocessorHost());
 
     this.store.addChangedReaderObserver(this);
 
@@ -241,10 +243,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
     if (dropDeletesFromRow == null) {
       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
-          earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost());
+          earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
     } else {
       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
-          oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
+          oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
     }
 
     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
@@ -284,7 +286,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
         scanInfo.getMinVersions(), readPt);
     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
-        Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null);
+        Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
 
     // In unit tests, the store could be null
     if (this.store != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
new file mode 100644
index 0000000..7ea69f2
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestImportTSVWithTTLs implements Configurable {
+
+  protected static final Log LOG = LogFactory.getLog(TestImportTSVWithTTLs.class);
+  protected static final String NAME = TestImportTsv.class.getSimpleName();
+  protected static HBaseTestingUtility util = new HBaseTestingUtility();
+
+  /**
+   * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
+   * false.
+   */
+  protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
+
+  /**
+   * Force use of combiner in doMROnTableTest. Boolean. Default is true.
+   */
+  protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
+
+  private final String FAMILY = "FAM";
+  private static Configuration conf;
+
+  @Override
+  public Configuration getConf() {
+    return util.getConfiguration();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    throw new IllegalArgumentException("setConf not supported");
+  }
+
+  @BeforeClass
+  public static void provisionCluster() throws Exception {
+    conf = util.getConfiguration();
+    // We don't check persistence in HFiles in this test, but if we ever do we will
+    // need this where the default hfile version is not 3 (i.e. 0.98)
+    conf.setInt("hfile.format.version", 3);
+    conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName());
+    util.startMiniCluster();
+    util.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void releaseCluster() throws Exception {
+    util.shutdownMiniMapReduceCluster();
+    util.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testMROnTable() throws Exception {
+    String tableName = "test-" + UUID.randomUUID();
+
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.MAPPER_CONF_KEY
+            + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName };
+    String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n";
+    util.createTable(TableName.valueOf(tableName), FAMILY);
+    doMROnTableTest(util, FAMILY, data, args, 1);
+    util.deleteTable(tableName);
+  }
+
+  protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
+      String[] args, int valueMultiplier) throws Exception {
+    TableName table = TableName.valueOf(args[args.length - 1]);
+    Configuration conf = new Configuration(util.getConfiguration());
+
+    // populate input file
+    FileSystem fs = FileSystem.get(conf);
+    Path inputPath = fs.makeQualified(new Path(util
+        .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
+    FSDataOutputStream op = fs.create(inputPath, true);
+    op.write(Bytes.toBytes(data));
+    op.close();
+    LOG.debug(String.format("Wrote test data to file: %s", inputPath));
+
+    if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
+      LOG.debug("Forcing combiner.");
+      conf.setInt("mapreduce.map.combine.minspills", 1);
+    }
+
+    // run the import
+    List<String> argv = new ArrayList<String>(Arrays.asList(args));
+    argv.add(inputPath.toString());
+    Tool tool = new ImportTsv();
+    LOG.debug("Running ImportTsv with arguments: " + argv);
+    try {
+      // Job will fail if observer rejects entries without TTL
+      assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
+    } finally {
+      // Clean up
+      if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
+        LOG.debug("Deleting test subdirectory");
+        util.cleanupDataTestDirOnTestFS(table.getNameAsString());
+      }
+    }
+
+    return tool;
+  }
+
+  public static class TTLCheckingObserver extends BaseRegionObserver {
+
+    @Override
+    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
+        Durability durability) throws IOException {
+      HRegion region = e.getEnvironment().getRegion();
+      if (!region.getRegionInfo().isMetaTable()
+          && !region.getRegionInfo().getTable().isSystemTable()) {
+        // The put carries the TTL attribute
+        if (put.getTTL() != Long.MAX_VALUE) {
+          return;
+        }
+        throw new IOException("Operation does not have TTL set");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 2f3e59a..4d74cd8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -91,6 +92,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
@@ -112,6 +114,7 @@ import org.apache.hadoop.hbase.filter.NullComparator;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -139,6 +142,7 @@ import org.apache.hadoop.hbase.wal.WALSplitter;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
@@ -5766,6 +5770,133 @@ public class TestHRegion {
     }
   }
 
+  @Test
+  public void testCellTTLs() throws IOException {
+    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(edge);
+
+    final byte[] row = Bytes.toBytes("testRow");
+    final byte[] q1 = Bytes.toBytes("q1");
+    final byte[] q2 = Bytes.toBytes("q2");
+    final byte[] q3 = Bytes.toBytes("q3");
+    final byte[] q4 = Bytes.toBytes("q4");
+
+    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testCellTTLs"));
+    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
+    hcd.setTimeToLive(10); // 10 seconds
+    htd.addFamily(hcd);
+
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
+
+    HRegion region = HRegion.createHRegion(new HRegionInfo(htd.getTableName(),
+        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
+      TEST_UTIL.getDataTestDir(), conf, htd);
+    assertNotNull(region);
+    try {
+      long now = EnvironmentEdgeManager.currentTime();
+      // Add a cell that will expire in 5 seconds via cell TTL
+      region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
+        HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+          // TTL tags specify ts in milliseconds
+          new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+      // Add a cell that will expire after 10 seconds via family setting
+      region.put(new Put(row).add(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
+      // Add a cell that will expire in 15 seconds via cell TTL
+      region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
+        HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+          // TTL tags specify ts in milliseconds
+          new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+      // Add a cell that will expire in 20 seconds via family setting
+      region.put(new Put(row).add(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
+
+      // Flush so we are sure store scanning gets this right
+      region.flushcache();
+
+      // A query at time T+0 should return all cells
+      Result r = region.get(new Get(row));
+      assertNotNull(r.getValue(fam1, q1));
+      assertNotNull(r.getValue(fam1, q2));
+      assertNotNull(r.getValue(fam1, q3));
+      assertNotNull(r.getValue(fam1, q4));
+
+      // Increment time to T+5 seconds
+      edge.incrementTime(5000);
+
+      r = region.get(new Get(row));
+      assertNull(r.getValue(fam1, q1));
+      assertNotNull(r.getValue(fam1, q2));
+      assertNotNull(r.getValue(fam1, q3));
+      assertNotNull(r.getValue(fam1, q4));
+
+      // Increment time to T+10 seconds
+      edge.incrementTime(5000);
+
+      r = region.get(new Get(row));
+      assertNull(r.getValue(fam1, q1));
+      assertNull(r.getValue(fam1, q2));
+      assertNotNull(r.getValue(fam1, q3));
+      assertNotNull(r.getValue(fam1, q4));
+
+      // Increment time to T+15 seconds
+      edge.incrementTime(5000);
+
+      r = region.get(new Get(row));
+      assertNull(r.getValue(fam1, q1));
+      assertNull(r.getValue(fam1, q2));
+      assertNull(r.getValue(fam1, q3));
+      assertNotNull(r.getValue(fam1, q4));
+
+      // Increment time to T+20 seconds
+      edge.incrementTime(10000);
+
+      r = region.get(new Get(row));
+      assertNull(r.getValue(fam1, q1));
+      assertNull(r.getValue(fam1, q2));
+      assertNull(r.getValue(fam1, q3));
+      assertNull(r.getValue(fam1, q4));
+
+      // Fun with disappearing increments
+
+      // Start at 1
+      region.put(new Put(row).add(fam1, q1, Bytes.toBytes(1L)));
+      r = region.get(new Get(row));
+      byte[] val = r.getValue(fam1, q1);
+      assertNotNull(val);
+      assertEquals(Bytes.toLong(val), 1L);
+
+      // Increment with a TTL of 5 seconds
+      Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
+      incr.setTTL(5000);
+      region.increment(incr); // 2
+
+      // New value should be 2
+      r = region.get(new Get(row));
+      val = r.getValue(fam1, q1);
+      assertNotNull(val);
+      assertEquals(Bytes.toLong(val), 2L);
+
+      // Increment time to T+25 seconds
+      edge.incrementTime(5000);
+
+      // Value should be back to 1
+      r = region.get(new Get(row));
+      val = r.getValue(fam1, q1);
+      assertNotNull(val);
+      assertEquals(Bytes.toLong(val), 1L);
+
+      // Increment time to T+30 seconds
+      edge.incrementTime(5000);
+
+      // Original value written at T+20 should be gone now via family TTL
+      r = region.get(new Get(row));
+      assertNull(r.getValue(fam1, q1));
+
+    } finally {
+      HRegion.closeHRegion(region);
+    }
+  }
+
   private static HRegion initHRegion(byte[] tableName, String callingMethod,
       byte[]... families) throws IOException {
     return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
index 35b71e7..d1270da 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
@@ -91,11 +91,12 @@ public class TestQueryMatcher extends HBaseTestCase {
 
   }
 
-    private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
-    // 2,4,5    
+  private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
+    long now = EnvironmentEdgeManager.currentTime();
+    // 2,4,5
     ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
         0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2),
-        EnvironmentEdgeManager.currentTime() - ttl);
+        now - ttl, now);
 
     List<KeyValue> memstore = new ArrayList<KeyValue>();
     memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -175,9 +176,10 @@ public class TestQueryMatcher extends HBaseTestCase {
     expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
     expected.add(ScanQueryMatcher.MatchCode.DONE);
 
+    long now = EnvironmentEdgeManager.currentTime();
     ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
         0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
-        EnvironmentEdgeManager.currentTime() - ttl);
+        now - ttl, now);
 
     List<KeyValue> memstore = new ArrayList<KeyValue>();
     memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -230,8 +232,8 @@ public class TestQueryMatcher extends HBaseTestCase {
 
     long now = EnvironmentEdgeManager.currentTime();
     ScanQueryMatcher qm =
-        new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
-            rowComparator), get.getFamilyMap().get(fam2), now - testTTL);
+      new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
+        rowComparator), get.getFamilyMap().get(fam2), now - testTTL, now);
 
     KeyValue [] kvs = new KeyValue[] {
         new KeyValue(row1, fam2, col1, now-100, data),
@@ -285,7 +287,7 @@ public class TestQueryMatcher extends HBaseTestCase {
     long now = EnvironmentEdgeManager.currentTime();
     ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
         0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
-        now - testTTL);
+        now - testTTL, now);
 
     KeyValue [] kvs = new KeyValue[] {
         new KeyValue(row1, fam2, col1, now-100, data),
@@ -343,7 +345,7 @@ public class TestQueryMatcher extends HBaseTestCase {
     NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
 
     ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
-        HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to, null);
+        HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, now, from, to, null);
     List<ScanQueryMatcher.MatchCode> actual =
         new ArrayList<ScanQueryMatcher.MatchCode>(rows.length);
     byte[] prevRow = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
index 14bd9d6..bad9084 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
@@ -418,8 +418,13 @@ public class TestTags {
       tags = TestCoprocessorForTags.tags;
       assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
       assertEquals(2, tags.size());
-      assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
-      assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+      // We cannot assume the ordering of tags
+      List<String> tagValues = new ArrayList<String>();
+      for (Tag tag: tags) {
+        tagValues.add(Bytes.toString(tag.getValue()));
+      }
+      assertTrue(tagValues.contains("tag1"));
+      assertTrue(tagValues.contains("tag2"));
       TestCoprocessorForTags.checkTagPresence = false;
       TestCoprocessorForTags.tags = null;
 
@@ -475,8 +480,13 @@ public class TestTags {
       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
       tags = TestCoprocessorForTags.tags;
       assertEquals(2, tags.size());
-      assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
-      assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+      // We cannot assume the ordering of tags
+      tagValues.clear();
+      for (Tag tag: tags) {
+        tagValues.add(Bytes.toString(tag.getValue()));
+      }
+      assertTrue(tagValues.contains("tag1"));
+      assertTrue(tagValues.contains("tag2"));
       TestCoprocessorForTags.checkTagPresence = false;
       TestCoprocessorForTags.tags = null;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-shell/src/main/ruby/hbase/table.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/table.rb b/hbase-shell/src/main/ruby/hbase/table.rb
index 85da4ed..1ec1cc4 100644
--- a/hbase-shell/src/main/ruby/hbase/table.rb
+++ b/hbase-shell/src/main/ruby/hbase/table.rb
@@ -141,17 +141,19 @@ EOF
          set_attributes(p, attributes) if attributes
          visibility = args[VISIBILITY]
          set_cell_visibility(p, visibility) if visibility
+         ttl = args[TTL]
+         set_op_ttl(p, ttl) if ttl
       end
       #Case where attributes are specified without timestamp
       if timestamp.kind_of?(Hash)
       	timestamp.each do |k, v|
-      	  if v.kind_of?(Hash)
-      	  	set_attributes(p, v) if v
-      	  end
-      	  if v.kind_of?(String)
-      	  	set_cell_visibility(p, v) if v
-      	  end
-      	  
+          if k == 'ATTRIBUTES'
+            set_attributes(p, v)
+          elsif k == 'VISIBILITY'
+            set_cell_visibility(p, v)
+          elsif k == "TTL"
+            set_op_ttl(p, v)
+          end
         end
         timestamp = nil
       end  
@@ -219,6 +221,8 @@ EOF
       	visibility = args[VISIBILITY]
         set_attributes(incr, attributes) if attributes
         set_cell_visibility(incr, visibility) if visibility
+        ttl = args[TTL]
+        set_op_ttl(incr, ttl) if ttl
       end
       incr.addColumn(family, qualifier, value)
       @table.increment(incr)
@@ -237,6 +241,8 @@ EOF
       	visibility = args[VISIBILITY]
         set_attributes(append, attributes) if attributes
         set_cell_visibility(append, visibility) if visibility
+        ttl = args[TTL]
+        set_op_ttl(append, ttl) if ttl
       end
       append.add(family, qualifier, value.to_s.to_java_bytes)
       @table.append(append)
@@ -545,6 +551,10 @@ EOF
           auths.to_java(:string)))
     end
 
+    def set_op_ttl(op, ttl)
+      op.setTTL(ttl.to_java(:long))
+    end
+
     #----------------------------
     # Add general administration utilities to the shell
     # each of the names below adds this method name to the table

http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-shell/src/test/ruby/hbase/table_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/table_test.rb b/hbase-shell/src/test/ruby/hbase/table_test.rb
index 7272229..fa2990d 100644
--- a/hbase-shell/src/test/ruby/hbase/table_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/table_test.rb
@@ -530,5 +530,18 @@ module Hbase
       end
     end
 
+    define_test "mutation with TTL should expire" do
+      @test_table.put('ttlTest', 'x:a', 'foo', { TTL => 1000 } )
+      begin
+        res = @test_table._get_internal('ttlTest', 'x:a')
+        assert_not_nil(res)
+        sleep 2
+        res = @test_table._get_internal('ttlTest', 'x:a')
+        assert_nil(res)
+      ensure
+        @test_table.delete('ttlTest', 'x:a')
+      end
+    end
+
   end
 end


[3/3] hbase git commit: HBASE-10560 Per cell TTLs

Posted by ap...@apache.org.
HBASE-10560 Per cell TTLs


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/869c5665
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/869c5665
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/869c5665

Branch: refs/heads/0.98
Commit: 869c5665cd4cc0cd7994933e839154002f682f44
Parents: 45c8be3
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Dec 5 11:00:45 2014 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Dec 5 11:53:59 2014 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/client/Delete.java  |   5 +
 .../apache/hadoop/hbase/client/Mutation.java    |  34 +++
 .../java/org/apache/hadoop/hbase/TagType.java   |   1 +
 .../hadoop/hbase/mapreduce/CellCreator.java     |  35 ++-
 .../hadoop/hbase/mapreduce/ImportTsv.java       |  55 +++-
 .../hadoop/hbase/mapreduce/TextSortReducer.java |  24 +-
 .../hbase/mapreduce/TsvImporterMapper.java      |  26 +-
 .../GetClosestRowBeforeTracker.java             |  26 +-
 .../hadoop/hbase/regionserver/HRegion.java      | 255 ++++++++++++++-----
 .../hadoop/hbase/regionserver/HStore.java       |  37 ++-
 .../hbase/regionserver/ScanQueryMatcher.java    |  30 ++-
 .../hadoop/hbase/regionserver/StoreScanner.java |  12 +-
 .../hbase/mapreduce/TestImportTSVWithTTLs.java  | 171 +++++++++++++
 .../hadoop/hbase/regionserver/TestHRegion.java  | 131 ++++++++++
 .../hbase/regionserver/TestQueryMatcher.java    |  16 +-
 .../hadoop/hbase/regionserver/TestTags.java     |  18 +-
 hbase-shell/src/main/ruby/hbase/table.rb        |  24 +-
 hbase-shell/src/test/ruby/hbase/table_test.rb   |  13 +
 18 files changed, 794 insertions(+), 119 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
index f1c954f..8a47e44 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
@@ -323,4 +323,9 @@ public class Delete extends Mutation implements Comparable<Row> {
     map.put("ts", this.ts);
     return map;
   }
+
+  @Override
+  public Delete setTTL(long ttl) {
+    throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index 869d940..ec41568 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -78,6 +78,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
    */
   private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
 
+  /**
+   * The attribute for storing TTL for the result of the mutation.
+   */
+  private static final String OP_ATTRIBUTE_TTL = "_ttl";
+
   protected byte [] row = null;
   protected long ts = HConstants.LATEST_TIMESTAMP;
   protected Durability durability = Durability.USE_DEFAULT;
@@ -206,6 +211,12 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
     if (getId() != null) {
       map.put("id", getId());
     }
+    // Add the TTL if set
+    // Long.MAX_VALUE is the default, and is interpreted to mean this attribute
+    // has not been set.
+    if (getTTL() != Long.MAX_VALUE) {
+      map.put("ttl", getTTL());
+    }
     return map;
   }
 
@@ -470,6 +481,29 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
   }
 
   /**
+   * Return the TTL requested for the result of the mutation, in milliseconds.
+   * @return the TTL requested for the result of the mutation, in milliseconds,
+   * or Long.MAX_VALUE if unset
+   */
+  public long getTTL() {
+    byte[] ttlBytes = getAttribute(OP_ATTRIBUTE_TTL);
+    if (ttlBytes != null) {
+      return Bytes.toLong(ttlBytes);
+    }
+    return Long.MAX_VALUE;
+  }
+
+  /**
+   * Set the TTL desired for the result of the mutation, in milliseconds.
+   * @param ttl the TTL desired for the result of the mutation, in milliseconds
+   * @return this
+   */
+  public Mutation setTTL(long ttl) {
+    setAttribute(OP_ATTRIBUTE_TTL, Bytes.toBytes(ttl));
+    return this;
+  }
+
+  /**
    * Subclasses should override this method to add the heap size of their own fields.
    * @return the heap size to add (will be aligned).
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index b113516..65b2c91 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -30,4 +30,5 @@ public final class TagType {
   public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
   // String based tag type used in replication
   public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
+  public static final byte TTL_TAG_TYPE = (byte)8;
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
index b3dfee7..001f64d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
@@ -69,7 +69,7 @@ public class CellCreator {
       byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
       int vlength) throws IOException {
     return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength,
-        timestamp, value, voffset, vlength, null);
+        timestamp, value, voffset, vlength, (List<Tag>)null);
   }
 
   /**
@@ -90,6 +90,7 @@ public class CellCreator {
    * @return created Cell
    * @throws IOException
    */
+  @Deprecated
   public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
       byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
       int vlength, String visExpression) throws IOException {
@@ -100,4 +101,36 @@ public class CellCreator {
     return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
         qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags);
   }
+
+  /**
+   * @param row row key
+   * @param roffset row offset
+   * @param rlength row length
+   * @param family family name
+   * @param foffset family offset
+   * @param flength family length
+   * @param qualifier column qualifier
+   * @param qoffset qualifier offset
+   * @param qlength qualifier length
+   * @param timestamp version timestamp
+   * @param value column value
+   * @param voffset value offset
+   * @param vlength value length
+   * @param tags
+   * @return created Cell
+   * @throws IOException
+   */
+  public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
+      byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
+      int vlength, List<Tag> tags) throws IOException {
+    return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
+        qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags);
+  }
+
+  /**
+   * @return Visibility expression resolver
+   */
+  public VisibilityExpressionResolver getVisibilityExpressionResolver() {
+    return this.visExpResolver;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
index a1f84bb..6c154f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
@@ -121,13 +121,20 @@ public class ImportTsv extends Configured implements Tool {
 
     public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY";
 
+    public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL";
+
     private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
 
     public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
 
     public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1;
 
+    public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1;
+
     private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+
+    private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX;
+
     /**
      * @param columnsSpecification the list of columns to parser out, comma separated.
      * The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
@@ -158,14 +165,18 @@ public class ImportTsv extends Configured implements Tool {
           timestampKeyColumnIndex = i;
           continue;
         }
-        if(ATTRIBUTES_COLUMN_SPEC.equals(str)) {
+        if (ATTRIBUTES_COLUMN_SPEC.equals(str)) {
           attrKeyColumnIndex = i;
           continue;
         }
-        if(CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
+        if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
           cellVisibilityColumnIndex = i;
           continue;
         }
+        if (CELL_TTL_COLUMN_SPEC.equals(str)) {
+          cellTTLColumnIndex = i;
+          continue;
+        }
         String[] parts = str.split(":", 2);
         if (parts.length == 1) {
           families[i] = str.getBytes();
@@ -193,6 +204,10 @@ public class ImportTsv extends Configured implements Tool {
       return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
     }
 
+    public boolean hasCellTTL() {
+      return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+    }
+
     public int getAttributesKeyColumnIndex() {
       return attrKeyColumnIndex;
     }
@@ -200,9 +215,15 @@ public class ImportTsv extends Configured implements Tool {
     public int getCellVisibilityColumnIndex() {
       return cellVisibilityColumnIndex;
     }
+
+    public int getCellTTLColumnIndex() {
+      return cellTTLColumnIndex;
+    }
+
     public int getRowKeyColumnIndex() {
       return rowKeyColumnIndex;
     }
+
     public byte[] getFamily(int idx) {
       return families[idx];
     }
@@ -234,8 +255,10 @@ public class ImportTsv extends Configured implements Tool {
         throw new BadTsvLineException("No timestamp");
       } else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
         throw new BadTsvLineException("No attributes specified");
-      } else if(hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
+      } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
         throw new BadTsvLineException("No cell visibility specified");
+      } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) {
+        throw new BadTsvLineException("No cell TTL specified");
       }
       return new ParsedLine(tabOffsets, lineBytes);
     }
@@ -332,6 +355,31 @@ public class ImportTsv extends Configured implements Tool {
         }
       }
 
+      public int getCellTTLColumnOffset() {
+        if (hasCellTTL()) {
+          return getColumnOffset(cellTTLColumnIndex);
+        } else {
+          return DEFAULT_CELL_TTL_COLUMN_INDEX;
+        }
+      }
+
+      public int getCellTTLColumnLength() {
+        if (hasCellTTL()) {
+          return getColumnLength(cellTTLColumnIndex);
+        } else {
+          return DEFAULT_CELL_TTL_COLUMN_INDEX;
+        }
+      }
+
+      public long getCellTTL() {
+        if (!hasCellTTL()) {
+          return 0;
+        } else {
+          return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex),
+              getColumnLength(cellTTLColumnIndex));
+        }
+      }
+
       public int getColumnOffset(int idx) {
         if (idx > 0)
           return tabOffsets.get(idx - 1) + 1;
@@ -489,6 +537,7 @@ public class ImportTsv extends Configured implements Tool {
       if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
           || TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn)
           || TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn)
+          || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn)
           || TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn))
         continue;
       // we are only concerned with the first one (in case this is a cf:cq)

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
index 4a0e0fd..b3981a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
@@ -18,7 +18,9 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
 
@@ -28,8 +30,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -62,6 +67,9 @@ public class TextSortReducer extends
   /** Cell visibility expr **/
   private String cellVisibilityExpr;
 
+  /** Cell TTL */
+  private long ttl;
+
   private CellCreator kvCreator;
 
   public long getTs() {
@@ -148,18 +156,30 @@ public class TextSortReducer extends
           // Retrieve timestamp if exists
           ts = parsed.getTimestamp(ts);
           cellVisibilityExpr = parsed.getCellVisibility();
+          ttl = parsed.getCellTTL();
 
           for (int i = 0; i < parsed.getColumnCount(); i++) {
             if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
-                || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
+                || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
+                || i == parser.getCellTTLColumnIndex()) {
               continue;
             }
             // Creating the KV which needs to be directly written to HFiles. Using the Facade
             // KVCreator for creation of kvs.
+            List<Tag> tags = new ArrayList<Tag>();
+            if (cellVisibilityExpr != null) {
+              tags.addAll(kvCreator.getVisibilityExpressionResolver()
+                .createVisibilityExpTags(cellVisibilityExpr));
+            }
+            // Add TTL directly to the KV so we can vary them when packing more than one KV
+            // into puts
+            if (ttl > 0) {
+              tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
+            }
             Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(),
                 parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
                 parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
-                parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr);
+                parsed.getColumnOffset(i), parsed.getColumnLength(i), tags);
             KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
             kvs.add(kv);
             curSize += kv.heapSize();

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
index ff84081..270de75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
@@ -18,17 +18,22 @@
 package org.apache.hadoop.hbase.mapreduce;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
 import org.apache.hadoop.hbase.security.visibility.CellVisibility;
 import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Counter;
@@ -59,6 +64,8 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
 
   protected String cellVisibilityExpr;
 
+  protected long ttl;
+
   protected CellCreator kvCreator;
 
   private String hfileOutPath;
@@ -144,11 +151,13 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
       // Retrieve timestamp if exists
       ts = parsed.getTimestamp(ts);
       cellVisibilityExpr = parsed.getCellVisibility();
+      ttl = parsed.getCellTTL();
 
       Put put = new Put(rowKey.copyBytes());
       for (int i = 0; i < parsed.getColumnCount(); i++) {
         if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
-            || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
+            || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
+            || i == parser.getCellTTLColumnIndex()) {
           continue;
         }
         populatePut(lineBytes, parsed, put, i);
@@ -192,13 +201,26 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
         // the validation
         put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
       }
+      if (ttl > 0) {
+        put.setTTL(ttl);
+      }
     } else {
       // Creating the KV which needs to be directly written to HFiles. Using the Facade
       // KVCreator for creation of kvs.
+      List<Tag> tags = new ArrayList<Tag>();
+      if (cellVisibilityExpr != null) {
+        tags.addAll(kvCreator.getVisibilityExpressionResolver()
+          .createVisibilityExpTags(cellVisibilityExpr));
+      }
+      // Add TTL directly to the KV so we can vary them when packing more than one KV
+      // into puts
+      if (ttl > 0) {
+        tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
+      }
       cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
           parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
           parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
-          parsed.getColumnLength(i), cellVisibilityExpr);
+          parsed.getColumnLength(i), tags);
     }
     put.add(cell);
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
index d37aa5a..876aae0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
@@ -24,6 +24,7 @@ import java.util.TreeMap;
 import java.util.TreeSet;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.KeyValue.KVComparator;
@@ -31,7 +32,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 
 /**
  * State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}.
- * Like {@link ScanDeleteTracker} and {@link ScanDeleteTracker} but does not
+ * Like {@link ScanQueryMatcher} and {@link ScanDeleteTracker} but does not
  * implement the {@link DeleteTracker} interface since state spans rows (There
  * is no update nor reset method).
  */
@@ -39,7 +40,8 @@ import org.apache.hadoop.hbase.util.Bytes;
 class GetClosestRowBeforeTracker {
   private final KeyValue targetkey;
   // Any cell w/ a ts older than this is expired.
-  private final long oldestts;
+  private final long now;
+  private final long oldestUnexpiredTs;
   private KeyValue candidate = null;
   private final KVComparator kvcomparator;
   // Flag for whether we're doing getclosest on a metaregion.
@@ -72,20 +74,13 @@ class GetClosestRowBeforeTracker {
         HConstants.DELIMITER) - this.rowoffset;
     }
     this.tablenamePlusDelimiterLength = metaregion? l + 1: -1;
-    this.oldestts = System.currentTimeMillis() - ttl;
+    this.now = System.currentTimeMillis();
+    this.oldestUnexpiredTs = now - ttl;
     this.kvcomparator = c;
     KeyValue.RowOnlyComparator rc = new KeyValue.RowOnlyComparator(this.kvcomparator);
     this.deletes = new TreeMap<KeyValue, NavigableSet<KeyValue>>(rc);
   }
 
-  /**
-   * @param kv
-   * @return True if this <code>kv</code> is expired.
-   */
-  boolean isExpired(final KeyValue kv) {
-    return HStore.isExpired(kv, this.oldestts);
-  }
-
   /*
    * Add the specified KeyValue to the list of deletes.
    * @param kv
@@ -170,6 +165,15 @@ class GetClosestRowBeforeTracker {
     return false;
   }
 
+  /**
+   * @param cell
+   * @return true if the cell is expired
+   */
+  public boolean isExpired(final Cell cell) {
+    return cell.getTimestamp() < this.oldestUnexpiredTs ||
+      HStore.isCellTTLExpired(cell, this.oldestUnexpiredTs, this.now);
+  }
+
   /*
    * Handle keys whose values hold deletes.
    * Add to the set of deletes and then if the candidate keys contain any that

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 1d554fc..777dcfe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableMap;
@@ -84,6 +85,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.RegionTooBusyException;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.UnknownScannerException;
 import org.apache.hadoop.hbase.backup.HFileArchiver;
 import org.apache.hadoop.hbase.client.Append;
@@ -2465,6 +2468,7 @@ public class HRegion implements HeapSize { // , Writable{
           }
           noOfDeletes++;
         }
+        rewriteCellTags(familyMaps[i], mutation);
       }
 
       lock(this.updatesLock.readLock(), numReadyToWrite);
@@ -2877,6 +2881,7 @@ public class HRegion implements HeapSize { // , Writable{
       closeRegionOperation();
     }
   }
+
   private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
     // Currently this is only called for puts and deletes, so no nonces.
     OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
@@ -2928,6 +2933,59 @@ public class HRegion implements HeapSize { // , Writable{
     }
   }
 
+  /**
+   * Possibly rewrite incoming cell tags.
+   */
+  void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
+    // Check if we have any work to do and early out otherwise
+    // Update these checks as more logic is added here
+
+    if (m.getTTL() == Long.MAX_VALUE) {
+      return;
+    }
+
+    // From this point we know we have some work to do
+
+    for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
+      List<Cell> cells = e.getValue();
+      assert cells instanceof RandomAccess;
+      int listSize = cells.size();
+      for (int i = 0; i < listSize; i++) {
+        Cell cell = cells.get(i);
+        List<Tag> newTags = new ArrayList<Tag>();
+        Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
+          cell.getTagsOffset(), cell.getTagsLengthUnsigned());
+
+        // Carry forward existing tags
+
+        while (tagIterator.hasNext()) {
+
+          // Add any filters or tag specific rewrites here
+
+          newTags.add(tagIterator.next());
+        }
+
+        // Cell TTL handling
+
+        // Check again if we need to add a cell TTL because early out logic
+        // above may change when there are more tag based features in core.
+        if (m.getTTL() != Long.MAX_VALUE) {
+          // Add a cell TTL tag
+          newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
+        }
+
+        // Rewrite the cell with the updated set of tags
+
+        cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+          cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+          cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+          cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
+          cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+          newTags));
+      }
+    }
+  }
+
   /*
    * Check if resources to support an update.
    *
@@ -5047,6 +5105,9 @@ public class HRegion implements HeapSize { // , Writable{
           processor.preBatchMutate(this, walEdit);
           // 7. Apply to memstore
           for (Mutation m : mutations) {
+            // Handle any tag based cell features
+            rewriteCellTags(m.getFamilyCellMap(), m);
+
             for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
               KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
               kv.setMvccVersion(writeEntry.getWriteNumber());
@@ -5173,8 +5234,9 @@ public class HRegion implements HeapSize { // , Writable{
     return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
   }
 
-  // TODO: There's a lot of boiler plate code identical
-  // to increment... See how to better unify that.
+  // TODO: There's a lot of boiler plate code identical to increment.
+  // We should refactor append and increment as local get-mutate-put
+  // transactions, so all stores only go through one code path for puts.
   /**
    * Perform one or more append operations on a row.
    *
@@ -5242,67 +5304,111 @@ public class HRegion implements HeapSize { // , Writable{
             // Iterate the input columns and update existing values if they were
             // found, otherwise add new column initialized to the append value
 
-            // Avoid as much copying as possible. Every byte is copied at most
-            // once.
+            // Avoid as much copying as possible. We may need to rewrite and
+            // consolidate tags. Bytes are only copied once.
             // Would be nice if KeyValue had scatter/gather logic
             int idx = 0;
             for (Cell cell : family.getValue()) {
               KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
-              KeyValue newKV;
+              KeyValue newKv;
               KeyValue oldKv = null;
               if (idx < results.size()
-                  && CellUtil.matchingQualifier(results.get(idx),kv)) {
+                  && CellUtil.matchingQualifier(results.get(idx), kv)) {
                 oldKv = KeyValueUtil.ensureKeyValue(results.get(idx));
-                // allocate an empty kv once
                 long ts = Math.max(now, oldKv.getTimestamp());
-                newKV = new KeyValue(row.length, kv.getFamilyLength(),
+
+                // Process cell tags
+                List<Tag> newTags = new ArrayList<Tag>();
+
+                // Make a union of the set of tags in the old and new KVs
+
+                if (oldKv.getTagsLengthUnsigned() > 0) {
+                  Iterator<Tag> i = CellUtil.tagsIterator(oldKv.getTagsArray(),
+                    oldKv.getTagsOffset(), oldKv.getTagsLengthUnsigned());
+                  while (i.hasNext()) {
+                    newTags.add(i.next());
+                  }
+                }
+                if (kv.getTagsLengthUnsigned() > 0) {
+                  Iterator<Tag> i  = CellUtil.tagsIterator(kv.getTagsArray(), kv.getTagsOffset(),
+                    kv.getTagsLengthUnsigned());
+                  while (i.hasNext()) {
+                    newTags.add(i.next());
+                  }
+                }
+
+                // Cell TTL handling
+
+                if (append.getTTL() != Long.MAX_VALUE) {
+                  // Add the new TTL tag
+                  newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+                }
+
+                // Rebuild tags
+                byte[] tagBytes = Tag.fromList(newTags);
+
+                // allocate an empty cell once
+                newKv = new KeyValue(row.length, kv.getFamilyLength(),
                     kv.getQualifierLength(), ts, KeyValue.Type.Put,
                     oldKv.getValueLength() + kv.getValueLength(),
-                    oldKv.getTagsLengthUnsigned() + kv.getTagsLengthUnsigned());
-                // copy in the value
-                System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(),
-                    newKV.getBuffer(), newKV.getValueOffset(),
-                    oldKv.getValueLength());
-                System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
-                    newKV.getBuffer(),
-                    newKV.getValueOffset() + oldKv.getValueLength(),
-                    kv.getValueLength());
-                // copy in the tags
-                System.arraycopy(oldKv.getBuffer(), oldKv.getTagsOffset(), newKV.getBuffer(),
-                    newKV.getTagsOffset(), oldKv.getTagsLengthUnsigned());
-                System.arraycopy(kv.getBuffer(), kv.getTagsOffset(), newKV.getBuffer(),
-                    newKV.getTagsOffset() + oldKv.getTagsLengthUnsigned(),
-                    kv.getTagsLengthUnsigned());
+                    tagBytes.length);
                 // copy in row, family, and qualifier
-                System.arraycopy(kv.getBuffer(), kv.getRowOffset(),
-                    newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength());
-                System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(),
-                    newKV.getBuffer(), newKV.getFamilyOffset(),
-                    kv.getFamilyLength());
-                System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
-                    newKV.getBuffer(), newKV.getQualifierOffset(),
-                    kv.getQualifierLength());
+                System.arraycopy(kv.getRowArray(), kv.getRowOffset(),
+                  newKv.getRowArray(), newKv.getRowOffset(), kv.getRowLength());
+                System.arraycopy(kv.getFamilyArray(), kv.getFamilyOffset(),
+                  newKv.getFamilyArray(), newKv.getFamilyOffset(),
+                  kv.getFamilyLength());
+                System.arraycopy(kv.getQualifierArray(), kv.getQualifierOffset(),
+                  newKv.getQualifierArray(), newKv.getQualifierOffset(),
+                  kv.getQualifierLength());
+                // copy in the value
+                System.arraycopy(oldKv.getValueArray(), oldKv.getValueOffset(),
+                  newKv.getValueArray(), newKv.getValueOffset(),
+                  oldKv.getValueLength());
+                System.arraycopy(kv.getValueArray(), kv.getValueOffset(),
+                  newKv.getValueArray(),
+                  newKv.getValueOffset() + oldKv.getValueLength(),
+                  kv.getValueLength());
+                // Copy in tag data
+                System.arraycopy(tagBytes, 0, newKv.getTagsArray(), newKv.getTagsOffset(),
+                  tagBytes.length);
                 idx++;
               } else {
-                newKV = kv;
                 // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
                 // so only need to update the timestamp to 'now'
-                newKV.updateLatestStamp(Bytes.toBytes(now));
+                kv.updateLatestStamp(Bytes.toBytes(now));
+
+                // Cell TTL handling
+
+                if (append.getTTL() != Long.MAX_VALUE) {
+                  List<Tag> newTags = new ArrayList<Tag>(1);
+                  newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+                  // Add the new TTL tag
+                  newKv = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+                    kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
+                    kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
+                    kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()),
+                    kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
+                    newTags);
+                } else {
+                  newKv = kv;
+                }
               }
-              newKV.setMvccVersion(w.getWriteNumber());
+              newKv.setMvccVersion(w.getWriteNumber());
+
               // Give coprocessors a chance to update the new cell
               if (coprocessorHost != null) {
-                newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
-                    RegionObserver.MutationType.APPEND, append, oldKv, (Cell) newKV));
+                newKv = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
+                    RegionObserver.MutationType.APPEND, append, oldKv, (Cell) newKv));
               }
-              kvs.add(newKV);
+              kvs.add(newKv);
 
               // Append update to WAL
               if (writeToWAL) {
                 if (walEdits == null) {
                   walEdits = new WALEdit();
                 }
-                walEdits.add(newKV);
+                walEdits.add(newKv);
               }
             }
 
@@ -5374,6 +5480,9 @@ public class HRegion implements HeapSize { // , Writable{
     return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
   }
 
+  // TODO: There's a lot of boiler plate code identical to append.
+  // We should refactor append and increment as local get-mutate-put
+  // transactions, so all stores only go through one code path for puts.
   /**
    * Perform one or more increment operations on a row.
    * @param increment
@@ -5442,13 +5551,23 @@ public class HRegion implements HeapSize { // , Writable{
             // Iterate the input columns and update existing values if they were
             // found, otherwise add new column initialized to the increment amount
             int idx = 0;
-            for (Cell kv: family.getValue()) {
-              long amount = Bytes.toLong(CellUtil.cloneValue(kv));
+            for (Cell cell: family.getValue()) {
+              long amount = Bytes.toLong(CellUtil.cloneValue(cell));
               boolean noWriteBack = (amount == 0);
+              List<Tag> newTags = new ArrayList<Tag>();
+
+              // Carry forward any tags that might have been added by a coprocessor
+              if (cell.getTagsLengthUnsigned() > 0) {
+                Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(),
+                  cell.getTagsOffset(), cell.getTagsLengthUnsigned());
+                while (i.hasNext()) {
+                  newTags.add(i.next());
+                }
+              }
 
               Cell c = null;
               long ts = now;
-              if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
+              if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
                 c = results.get(idx);
                 ts = Math.max(now, c.getTimestamp());
                 if(c.getValueLength() == Bytes.SIZEOF_LONG) {
@@ -5458,48 +5577,52 @@ public class HRegion implements HeapSize { // , Writable{
                   throw new org.apache.hadoop.hbase.DoNotRetryIOException(
                       "Attempted to increment field that isn't 64 bits wide");
                 }
+                // Carry tags forward from previous version
+                if (c.getTagsLength() > 0) {
+                  Iterator<Tag> i = CellUtil.tagsIterator(c.getTagsArray(),
+                    c.getTagsOffset(), c.getTagsLength());
+                  while (i.hasNext()) {
+                    newTags.add(i.next());
+                  }
+                }
                 idx++;
               }
 
               // Append new incremented KeyValue to list
-              byte[] q = CellUtil.cloneQualifier(kv);
+              byte[] q = CellUtil.cloneQualifier(cell);
               byte[] val = Bytes.toBytes(amount);
-              int oldCellTagsLen = (c == null) ? 0 : c.getTagsLengthUnsigned();
-              int incCellTagsLen = kv.getTagsLengthUnsigned();
-              KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, ts,
-                  KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
-              System.arraycopy(row, 0, newKV.getBuffer(), newKV.getRowOffset(), row.length);
-              System.arraycopy(family.getKey(), 0, newKV.getBuffer(), newKV.getFamilyOffset(),
-                  family.getKey().length);
-              System.arraycopy(q, 0, newKV.getBuffer(), newKV.getQualifierOffset(), q.length);
-              // copy in the value
-              System.arraycopy(val, 0, newKV.getBuffer(), newKV.getValueOffset(), val.length);
-              // copy tags
-              if (oldCellTagsLen > 0) {
-                System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getBuffer(),
-                    newKV.getTagsOffset(), oldCellTagsLen);
-              }
-              if (incCellTagsLen > 0) {
-                System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getBuffer(),
-                    newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
+
+              // Add the TTL tag if the mutation carried one
+              if (increment.getTTL() != Long.MAX_VALUE) {
+                newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
               }
-              newKV.setMvccVersion(w.getWriteNumber());
+
+              KeyValue newKv = new KeyValue(row, 0, row.length,
+                family.getKey(), 0, family.getKey().length,
+                q, 0, q.length,
+                ts,
+                KeyValue.Type.Put,
+                val, 0, val.length,
+                newTags);
+
+              newKv.setMvccVersion(w.getWriteNumber());
+
               // Give coprocessors a chance to update the new cell
               if (coprocessorHost != null) {
-                newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
-                    RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKV));
+                newKv = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
+                    RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKv));
               }
-              allKVs.add(newKV);
+              allKVs.add(newKv);
 
               if (!noWriteBack) {
-                kvs.add(newKV);
+                kvs.add(newKv);
 
                 // Prepare WAL updates
                 if (writeToWAL) {
                   if (walEdits == null) {
                     walEdits = new WALEdit();
                   }
-                  walEdits.add(newKV);
+                  walEdits.add(newKv);
                 }
               }
             }

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 39a0677..5ea1a9c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CompoundConfiguration;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
@@ -57,6 +58,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.RemoteExceptionHandler;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.compress.Compression;
 import org.apache.hadoop.hbase.io.crypto.Cipher;
@@ -1597,8 +1600,38 @@ public class HStore implements Store {
     return wantedVersions > maxVersions ? maxVersions: wantedVersions;
   }
 
-  static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
-    return key.getTimestamp() < oldestTimestamp;
+  /**
+   * @param kv
+   * @param oldestTimestamp
+   * @return true if the cell is expired
+   */
+  static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
+    // Do not create an Iterator or Tag objects unless the cell actually has
+    // tags
+    if (cell.getTagsLengthUnsigned() > 0) {
+      // Look for a TTL tag first. Use it instead of the family setting if
+      // found. If a cell has multiple TTLs, resolve the conflict by using the
+      // first tag encountered.
+      Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+        cell.getTagsLengthUnsigned());
+      while (i.hasNext()) {
+        Tag t = i.next();
+        if (TagType.TTL_TAG_TYPE == t.getType()) {
+          // Unlike in schema cell TTLs are stored in milliseconds, no need
+          // to convert
+          long ts = cell.getTimestamp();
+          assert t.getTagLength() == Bytes.SIZEOF_LONG;
+          long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
+          if (ts + ttl < now) {
+            return true;
+          }
+          // Per cell TTLs cannot extend lifetime beyond family settings, so
+          // fall through to check that
+          break;
+        }
+      }
+    }
+    return false;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
index 98921f4..2b56edb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
@@ -24,6 +24,7 @@ import java.util.NavigableSet;
 
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeepDeletedCells;
 import org.apache.hadoop.hbase.KeyValue;
@@ -100,6 +101,10 @@ public class ScanQueryMatcher {
   private final long earliestPutTs;
   private final long ttl;
 
+  /** The oldest timestamp we are interested in, based on TTL */
+  private final long oldestUnexpiredTS;
+  private final long now;
+
   /** readPoint over which the KVs are unconditionally included */
   protected long maxReadPointToTrackVersions;
 
@@ -152,7 +157,7 @@ public class ScanQueryMatcher {
    */
   public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
       ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
-      RegionCoprocessorHost regionCoprocessorHost) throws IOException {
+      long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
     this.tr = scan.getTimeRange();
     this.rowComparator = scanInfo.getComparator();
     this.regionCoprocessorHost = regionCoprocessorHost;
@@ -162,6 +167,9 @@ public class ScanQueryMatcher {
         scanInfo.getFamily());
     this.filter = scan.getFilter();
     this.earliestPutTs = earliestPutTs;
+    this.oldestUnexpiredTS = oldestUnexpiredTS;
+    this.now = now;
+
     this.maxReadPointToTrackVersions = readPointToUse;
     this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
     this.ttl = oldestUnexpiredTS;
@@ -216,18 +224,18 @@ public class ScanQueryMatcher {
    * @param scanInfo The store's immutable scan info
    * @param columns
    * @param earliestPutTs Earliest put seen in any of the store files.
-   * @param oldestUnexpiredTS the oldest timestamp we are interested in,
-   *  based on TTL
+   * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
+   * @param now the current server time
    * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
    * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
    * @param regionCoprocessorHost 
    * @throws IOException 
    */
   public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
-      long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, byte[] dropDeletesFromRow,
+      long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow,
       byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
     this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
-        oldestUnexpiredTS, regionCoprocessorHost);
+        oldestUnexpiredTS, now, regionCoprocessorHost);
     Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
     this.dropDeletesFromRow = dropDeletesFromRow;
     this.dropDeletesToRow = dropDeletesToRow;
@@ -237,10 +245,10 @@ public class ScanQueryMatcher {
    * Constructor for tests
    */
   ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
-      NavigableSet<byte[]> columns, long oldestUnexpiredTS) throws IOException {
+      NavigableSet<byte[]> columns, long oldestUnexpiredTS, long now) throws IOException {
     this(scan, scanInfo, columns, ScanType.USER_SCAN,
           Long.MAX_VALUE, /* max Readpoint to track versions */
-        HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, null);
+        HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null);
   }
 
   /**
@@ -299,7 +307,6 @@ public class ScanQueryMatcher {
       }
     }
 
-
     // optimize case.
     if (this.stickyNextRow)
         return MatchCode.SEEK_NEXT_ROW;
@@ -322,8 +329,13 @@ public class ScanQueryMatcher {
     long timestamp = Bytes.toLong(bytes, initialOffset + keyLength - KeyValue.TIMESTAMP_TYPE_SIZE);
     // check for early out based on timestamp alone
     if (columns.isDone(timestamp)) {
-        return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
+      return columns.getNextRowOrNextColumn(kv.getQualifierArray(), kv.getQualifierOffset(),
+        kv.getQualifierLength());
     }
+    // check if the cell is expired by cell TTL
+    if (HStore.isCellTTLExpired(kv, this.oldestUnexpiredTS, this.now)) {
+      return MatchCode.SKIP;
+    }    
 
     /*
      * The delete logic is pretty complicated now.

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index e8d5f1d..853e1bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -76,6 +76,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
   protected final Scan scan;
   protected final NavigableSet<byte[]> columns;
   protected final long oldestUnexpiredTS;
+  protected final long now;
   protected final int minVersions;
 
   /**
@@ -121,7 +122,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     explicitColumnQuery = numCol > 0;
     this.scan = scan;
     this.columns = columns;
-    oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl;
+    this.now = EnvironmentEdgeManager.currentTimeMillis();
+    this.oldestUnexpiredTS = now - ttl;
     this.minVersions = minVersions;
 
     if (store != null && ((HStore)store).getHRegion() != null
@@ -171,7 +173,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     }
     matcher = new ScanQueryMatcher(scan, scanInfo, columns,
         ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
-        oldestUnexpiredTS, store.getCoprocessorHost());
+        oldestUnexpiredTS, now, store.getCoprocessorHost());
 
     this.store.addChangedReaderObserver(this);
 
@@ -236,10 +238,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
         ((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
     if (dropDeletesFromRow == null) {
       matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
-          earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost());
+          earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
     } else {
       matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
-          oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
+          oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
     }
 
     // Filter the list of scanners using Bloom filters, time range, TTL, etc.
@@ -279,7 +281,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
         scanInfo.getMinVersions(), readPt);
     this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
-        Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null);
+        Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
 
     // In unit tests, the store could be null
     if (this.store != null) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
new file mode 100644
index 0000000..062c05a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestImportTSVWithTTLs implements Configurable {
+
+  protected static final Log LOG = LogFactory.getLog(TestImportTSVWithTTLs.class);
+  protected static final String NAME = TestImportTsv.class.getSimpleName();
+  protected static HBaseTestingUtility util = new HBaseTestingUtility();
+
+  /**
+   * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
+   * false.
+   */
+  protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
+
+  /**
+   * Force use of combiner in doMROnTableTest. Boolean. Default is true.
+   */
+  protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
+
+  private final String FAMILY = "FAM";
+  private static Configuration conf;
+
+  @Override
+  public Configuration getConf() {
+    return util.getConfiguration();
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    throw new IllegalArgumentException("setConf not supported");
+  }
+
+  @BeforeClass
+  public static void provisionCluster() throws Exception {
+    conf = util.getConfiguration();
+    // We don't check persistence in HFiles in this test, but if we ever do we will
+    // need this where the default hfile version is not 3 (i.e. 0.98)
+    conf.setInt("hfile.format.version", 3);
+    conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName());
+    util.startMiniCluster();
+    util.startMiniMapReduceCluster();
+  }
+
+  @AfterClass
+  public static void releaseCluster() throws Exception {
+    util.shutdownMiniMapReduceCluster();
+    util.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testMROnTable() throws Exception {
+    String tableName = "test-" + UUID.randomUUID();
+
+    // Prepare the arguments required for the test.
+    String[] args = new String[] {
+        "-D" + ImportTsv.MAPPER_CONF_KEY
+            + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
+        "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL",
+        "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName };
+    String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n";
+    util.createTable(tableName, FAMILY);
+    doMROnTableTest(util, FAMILY, data, args, 1);
+    util.deleteTable(tableName);
+  }
+
+  protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
+      String[] args, int valueMultiplier) throws Exception {
+    TableName table = TableName.valueOf(args[args.length - 1]);
+    Configuration conf = new Configuration(util.getConfiguration());
+
+    // populate input file
+    FileSystem fs = FileSystem.get(conf);
+    Path inputPath = fs.makeQualified(new Path(util
+        .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
+    FSDataOutputStream op = fs.create(inputPath, true);
+    op.write(Bytes.toBytes(data));
+    op.close();
+    LOG.debug(String.format("Wrote test data to file: %s", inputPath));
+
+    if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
+      LOG.debug("Forcing combiner.");
+      conf.setInt("mapreduce.map.combine.minspills", 1);
+    }
+
+    // run the import
+    List<String> argv = new ArrayList<String>(Arrays.asList(args));
+    argv.add(inputPath.toString());
+    Tool tool = new ImportTsv();
+    LOG.debug("Running ImportTsv with arguments: " + argv);
+    try {
+      // Job will fail if observer rejects entries without TTL
+      assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
+    } finally {
+      // Clean up
+      if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
+        LOG.debug("Deleting test subdirectory");
+        util.cleanupDataTestDirOnTestFS(table.getNameAsString());
+      }
+    }
+
+    return tool;
+  }
+
+  public static class TTLCheckingObserver extends BaseRegionObserver {
+
+    @Override
+    public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
+        Durability durability) throws IOException {
+      HRegion region = e.getEnvironment().getRegion();
+      if (!region.getRegionInfo().isMetaTable()
+          && !region.getRegionInfo().getTable().isSystemTable()) {
+        // The put carries the TTL attribute
+        if (put.getTTL() != Long.MAX_VALUE) {
+          return;
+        }
+        throw new IOException("Operation does not have TTL set");
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 777b3a5..b40a7eb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.Tag;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.HRegionInfo;
@@ -88,6 +89,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
 import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
 import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TagType;
 import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Append;
 import org.apache.hadoop.hbase.client.Delete;
@@ -109,6 +111,7 @@ import org.apache.hadoop.hbase.filter.NullComparator;
 import org.apache.hadoop.hbase.filter.PrefixFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
 import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.io.hfile.HFile;
 import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
 import org.apache.hadoop.hbase.monitoring.MonitoredTask;
 import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -127,6 +130,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.test.MetricsAssertHelper;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
 import org.apache.hadoop.hbase.util.FSUtils;
 import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
@@ -5139,6 +5143,133 @@ public class TestHRegion {
     }
   }
 
+  @Test
+  public void testCellTTLs() throws IOException {
+    IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
+    EnvironmentEdgeManager.injectEdge(edge);
+
+    final byte[] row = Bytes.toBytes("testRow");
+    final byte[] q1 = Bytes.toBytes("q1");
+    final byte[] q2 = Bytes.toBytes("q2");
+    final byte[] q3 = Bytes.toBytes("q3");
+    final byte[] q4 = Bytes.toBytes("q4");
+
+    HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testCellTTLs"));
+    HColumnDescriptor hcd = new HColumnDescriptor(fam1);
+    hcd.setTimeToLive(10); // 10 seconds
+    htd.addFamily(hcd);
+
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
+
+    HRegion region = HRegion.createHRegion(new HRegionInfo(htd.getTableName(),
+        HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
+      TEST_UTIL.getDataTestDir(), conf, htd);
+    assertNotNull(region);
+    try {
+      long now = EnvironmentEdgeManager.currentTimeMillis();
+      // Add a cell that will expire in 5 seconds via cell TTL
+      region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
+        HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+          // TTL tags specify ts in milliseconds
+          new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+      // Add a cell that will expire after 10 seconds via family setting
+      region.put(new Put(row).add(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
+      // Add a cell that will expire in 15 seconds via cell TTL
+      region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
+        HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+          // TTL tags specify ts in milliseconds
+          new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+      // Add a cell that will expire in 20 seconds via family setting
+      region.put(new Put(row).add(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
+
+      // Flush so we are sure store scanning gets this right
+      region.flushcache();
+
+      // A query at time T+0 should return all cells
+      Result r = region.get(new Get(row));
+      assertNotNull(r.getValue(fam1, q1));
+      assertNotNull(r.getValue(fam1, q2));
+      assertNotNull(r.getValue(fam1, q3));
+      assertNotNull(r.getValue(fam1, q4));
+
+      // Increment time to T+5 seconds
+      edge.incrementTime(5000);
+
+      r = region.get(new Get(row));
+      assertNull(r.getValue(fam1, q1));
+      assertNotNull(r.getValue(fam1, q2));
+      assertNotNull(r.getValue(fam1, q3));
+      assertNotNull(r.getValue(fam1, q4));
+
+      // Increment time to T+10 seconds
+      edge.incrementTime(5000);
+
+      r = region.get(new Get(row));
+      assertNull(r.getValue(fam1, q1));
+      assertNull(r.getValue(fam1, q2));
+      assertNotNull(r.getValue(fam1, q3));
+      assertNotNull(r.getValue(fam1, q4));
+
+      // Increment time to T+15 seconds
+      edge.incrementTime(5000);
+
+      r = region.get(new Get(row));
+      assertNull(r.getValue(fam1, q1));
+      assertNull(r.getValue(fam1, q2));
+      assertNull(r.getValue(fam1, q3));
+      assertNotNull(r.getValue(fam1, q4));
+
+      // Increment time to T+20 seconds
+      edge.incrementTime(10000);
+
+      r = region.get(new Get(row));
+      assertNull(r.getValue(fam1, q1));
+      assertNull(r.getValue(fam1, q2));
+      assertNull(r.getValue(fam1, q3));
+      assertNull(r.getValue(fam1, q4));
+
+      // Fun with disappearing increments
+
+      // Start at 1
+      region.put(new Put(row).add(fam1, q1, Bytes.toBytes(1L)));
+      r = region.get(new Get(row));
+      byte[] val = r.getValue(fam1, q1);
+      assertNotNull(val);
+      assertEquals(Bytes.toLong(val), 1L);
+
+      // Increment with a TTL of 5 seconds
+      Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
+      incr.setTTL(5000);
+      region.increment(incr); // 2
+
+      // New value should be 2
+      r = region.get(new Get(row));
+      val = r.getValue(fam1, q1);
+      assertNotNull(val);
+      assertEquals(Bytes.toLong(val), 2L);
+
+      // Increment time to T+25 seconds
+      edge.incrementTime(5000);
+
+      // Value should be back to 1
+      r = region.get(new Get(row));
+      val = r.getValue(fam1, q1);
+      assertNotNull(val);
+      assertEquals(Bytes.toLong(val), 1L);
+
+      // Increment time to T+30 seconds
+      edge.incrementTime(5000);
+
+      // Original value written at T+20 should be gone now via family TTL
+      r = region.get(new Get(row));
+      assertNull(r.getValue(fam1, q1));
+
+    } finally {
+      HRegion.closeHRegion(region);
+    }
+  }
+
   private static HRegion initHRegion(byte[] tableName, String callingMethod,
       byte[]... families) throws IOException {
     return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
index aafa710..2ef9838 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
@@ -91,11 +91,12 @@ public class TestQueryMatcher extends HBaseTestCase {
 
   }
 
-    private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
-    // 2,4,5    
+  private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
+    long now = EnvironmentEdgeManager.currentTimeMillis();
+    // 2,4,5
     ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
         0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2),
-        EnvironmentEdgeManager.currentTimeMillis() - ttl);
+        now - ttl, now);
 
     List<KeyValue> memstore = new ArrayList<KeyValue>();
     memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -175,9 +176,10 @@ public class TestQueryMatcher extends HBaseTestCase {
     expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
     expected.add(ScanQueryMatcher.MatchCode.DONE);
 
+    long now = EnvironmentEdgeManager.currentTimeMillis();
     ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
         0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
-        EnvironmentEdgeManager.currentTimeMillis() - ttl);
+        now - ttl, now);
 
     List<KeyValue> memstore = new ArrayList<KeyValue>();
     memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -231,7 +233,7 @@ public class TestQueryMatcher extends HBaseTestCase {
     long now = EnvironmentEdgeManager.currentTimeMillis();
     ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
         0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2),
-        now - testTTL);
+        now - testTTL, now);
 
     KeyValue [] kvs = new KeyValue[] {
         new KeyValue(row1, fam2, col1, now-100, data),
@@ -285,7 +287,7 @@ public class TestQueryMatcher extends HBaseTestCase {
     long now = EnvironmentEdgeManager.currentTimeMillis();
     ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
         0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
-        now - testTTL);
+        now - testTTL, now);
 
     KeyValue [] kvs = new KeyValue[] {
         new KeyValue(row1, fam2, col1, now-100, data),
@@ -343,7 +345,7 @@ public class TestQueryMatcher extends HBaseTestCase {
     NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
 
     ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
-        HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to, null);
+        HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, now, from, to, null);
     List<ScanQueryMatcher.MatchCode> actual =
         new ArrayList<ScanQueryMatcher.MatchCode>(rows.length);
     byte[] prevRow = null;

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
index f218a43..b00e988 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
@@ -414,8 +414,13 @@ public class TestTags {
       tags = TestCoprocessorForTags.tags;
       assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
       assertEquals(2, tags.size());
-      assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
-      assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+      // We cannot assume the ordering of tags
+      List<String> tagValues = new ArrayList<String>();
+      for (Tag tag: tags) {
+        tagValues.add(Bytes.toString(tag.getValue()));
+      }
+      assertTrue(tagValues.contains("tag1"));
+      assertTrue(tagValues.contains("tag2"));
       TestCoprocessorForTags.checkTagPresence = false;
       TestCoprocessorForTags.tags = null;
 
@@ -471,8 +476,13 @@ public class TestTags {
       kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
       tags = TestCoprocessorForTags.tags;
       assertEquals(2, tags.size());
-      assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
-      assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+      // We cannot assume the ordering of tags
+      tagValues.clear();
+      for (Tag tag: tags) {
+        tagValues.add(Bytes.toString(tag.getValue()));
+      }
+      assertTrue(tagValues.contains("tag1"));
+      assertTrue(tagValues.contains("tag2"));
       TestCoprocessorForTags.checkTagPresence = false;
       TestCoprocessorForTags.tags = null;
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-shell/src/main/ruby/hbase/table.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/table.rb b/hbase-shell/src/main/ruby/hbase/table.rb
index 3fe006b..10b754a 100644
--- a/hbase-shell/src/main/ruby/hbase/table.rb
+++ b/hbase-shell/src/main/ruby/hbase/table.rb
@@ -136,17 +136,19 @@ EOF
          set_attributes(p, attributes) if attributes
          visibility = args[VISIBILITY]
          set_cell_visibility(p, visibility) if visibility
+         ttl = args[TTL]
+         set_op_ttl(p, ttl) if ttl
       end
       #Case where attributes are specified without timestamp
       if timestamp.kind_of?(Hash)
       	timestamp.each do |k, v|
-      	  if v.kind_of?(Hash)
-      	  	set_attributes(p, v) if v
-      	  end
-      	  if v.kind_of?(String)
-      	  	set_cell_visibility(p, v) if v
-      	  end
-      	  
+          if k == 'ATTRIBUTES'
+            set_attributes(p, v)
+          elsif k == 'VISIBILITY'
+            set_cell_visibility(p, v)
+          elsif k == "TTL"
+            set_op_ttl(p, v)
+          end
         end
         timestamp = nil
       end  
@@ -214,6 +216,8 @@ EOF
       	visibility = args[VISIBILITY]
         set_attributes(incr, attributes) if attributes
         set_cell_visibility(incr, visibility) if visibility
+        ttl = args[TTL]
+        set_op_ttl(incr, ttl) if ttl
       end
       incr.addColumn(family, qualifier, value)
       @table.increment(incr)
@@ -232,6 +236,8 @@ EOF
       	visibility = args[VISIBILITY]
         set_attributes(append, attributes) if attributes
         set_cell_visibility(append, visibility) if visibility
+        ttl = args[TTL]
+        set_op_ttl(append, ttl) if ttl
       end
       append.add(family, qualifier, value.to_s.to_java_bytes)
       @table.append(append)
@@ -532,6 +538,10 @@ EOF
           auths.to_java(:string)))
     end
 
+    def set_op_ttl(op, ttl)
+      op.setTTL(ttl.to_java(:long))
+    end
+
     #----------------------------
     # Add general administration utilities to the shell
     # each of the names below adds this method name to the table

http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-shell/src/test/ruby/hbase/table_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/table_test.rb b/hbase-shell/src/test/ruby/hbase/table_test.rb
index 7272229..fa2990d 100644
--- a/hbase-shell/src/test/ruby/hbase/table_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/table_test.rb
@@ -530,5 +530,18 @@ module Hbase
       end
     end
 
+    define_test "mutation with TTL should expire" do
+      @test_table.put('ttlTest', 'x:a', 'foo', { TTL => 1000 } )
+      begin
+        res = @test_table._get_internal('ttlTest', 'x:a')
+        assert_not_nil(res)
+        sleep 2
+        res = @test_table._get_internal('ttlTest', 'x:a')
+        assert_nil(res)
+      ensure
+        @test_table.delete('ttlTest', 'x:a')
+      end
+    end
+
   end
 end