You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2014/12/05 21:00:58 UTC
[1/3] hbase git commit: HBASE-10560 Per cell TTLs
Repository: hbase
Updated Branches:
refs/heads/0.98 45c8be3fc -> 869c5665c
refs/heads/branch-1 1bd27bfa2 -> 004e977ba
refs/heads/master f83e25e18 -> 09cd3d7bf
HBASE-10560 Per cell TTLs
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/09cd3d7b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/09cd3d7b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/09cd3d7b
Branch: refs/heads/master
Commit: 09cd3d7bfb9f4c4b279a74441d4059da6b8177d2
Parents: f83e25e
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Dec 5 10:59:56 2014 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Dec 5 11:10:26 2014 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/Append.java | 5 +
.../org/apache/hadoop/hbase/client/Delete.java | 5 +
.../apache/hadoop/hbase/client/Increment.java | 5 +
.../apache/hadoop/hbase/client/Mutation.java | 34 +++
.../org/apache/hadoop/hbase/client/Put.java | 5 +
.../java/org/apache/hadoop/hbase/TagType.java | 1 +
.../hadoop/hbase/mapreduce/CellCreator.java | 35 ++-
.../hadoop/hbase/mapreduce/ImportTsv.java | 55 ++++-
.../hadoop/hbase/mapreduce/TextSortReducer.java | 24 +-
.../hbase/mapreduce/TsvImporterMapper.java | 26 ++-
.../GetClosestRowBeforeTracker.java | 25 ++-
.../hadoop/hbase/regionserver/HRegion.java | 224 ++++++++++++++-----
.../hadoop/hbase/regionserver/HStore.java | 36 ++-
.../hbase/regionserver/ScanQueryMatcher.java | 26 ++-
.../hadoop/hbase/regionserver/StoreScanner.java | 12 +-
.../hbase/mapreduce/TestImportTSVWithTTLs.java | 173 ++++++++++++++
.../hadoop/hbase/regionserver/TestHRegion.java | 131 +++++++++++
.../hbase/regionserver/TestQueryMatcher.java | 18 +-
.../TestScanWildcardColumnTracker.java | 3 -
.../hadoop/hbase/regionserver/TestTags.java | 18 +-
hbase-shell/src/main/ruby/hbase/table.rb | 24 +-
hbase-shell/src/test/ruby/hbase/table_test.rb | 13 ++
22 files changed, 793 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
index 9e3be3e..d5a4552 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
@@ -171,4 +171,9 @@ public class Append extends Mutation {
public Append setACL(Map<String, Permission> perms) {
return (Append) super.setACL(perms);
}
+
+ @Override
+ public Append setTTL(long ttl) {
+ return (Append) super.setTTL(ttl);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
index ea76d0d..d947ef8 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
@@ -461,4 +461,9 @@ public class Delete extends Mutation implements Comparable<Row> {
public Delete setACL(Map<String, Permission> perms) {
return (Delete) super.setACL(perms);
}
+
+ @Override
+ public Delete setTTL(long ttl) {
+ throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
index b12ee02..b6e6a52 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
@@ -317,4 +317,9 @@ public class Increment extends Mutation implements Comparable<Row> {
public Increment setACL(Map<String, Permission> perms) {
return (Increment) super.setACL(perms);
}
+
+ @Override
+ public Increment setTTL(long ttl) {
+ return (Increment) super.setTTL(ttl);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index b7f7d1b..28284e5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -76,6 +76,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
*/
private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
+ /**
+ * The attribute for storing TTL for the result of the mutation.
+ */
+ private static final String OP_ATTRIBUTE_TTL = "_ttl";
+
protected byte [] row = null;
protected long ts = HConstants.LATEST_TIMESTAMP;
protected Durability durability = Durability.USE_DEFAULT;
@@ -199,6 +204,12 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
if (getId() != null) {
map.put("id", getId());
}
+ // Add the TTL if set
+ // Long.MAX_VALUE is the default, and is interpreted to mean this attribute
+ // has not been set.
+ if (getTTL() != Long.MAX_VALUE) {
+ map.put("ttl", getTTL());
+ }
return map;
}
@@ -417,6 +428,29 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
}
/**
+ * Return the TTL requested for the result of the mutation, in milliseconds.
+ * @return the TTL requested for the result of the mutation, in milliseconds,
+ * or Long.MAX_VALUE if unset
+ */
+ public long getTTL() {
+ byte[] ttlBytes = getAttribute(OP_ATTRIBUTE_TTL);
+ if (ttlBytes != null) {
+ return Bytes.toLong(ttlBytes);
+ }
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * Set the TTL desired for the result of the mutation, in milliseconds.
+ * @param ttl the TTL desired for the result of the mutation, in milliseconds
+ * @return this
+ */
+ public Mutation setTTL(long ttl) {
+ setAttribute(OP_ATTRIBUTE_TTL, Bytes.toBytes(ttl));
+ return this;
+ }
+
+ /**
* Subclasses should override this method to add the heap size of their own fields.
* @return the heap size to add (will be aligned).
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
index f0d3a72..b9d652d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
@@ -460,4 +460,9 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
public Put setACL(Map<String, Permission> perms) {
return (Put) super.setACL(perms);
}
+
+ @Override
+ public Put setTTL(long ttl) {
+ return (Put) super.setTTL(ttl);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index f088dcf..2095b7a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -30,4 +30,5 @@ public final class TagType {
public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
// String based tag type used in replication
public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
+ public static final byte TTL_TAG_TYPE = (byte)8;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
index b3dfee7..001f64d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
@@ -69,7 +69,7 @@ public class CellCreator {
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
int vlength) throws IOException {
return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength,
- timestamp, value, voffset, vlength, null);
+ timestamp, value, voffset, vlength, (List<Tag>)null);
}
/**
@@ -90,6 +90,7 @@ public class CellCreator {
* @return created Cell
* @throws IOException
*/
+ @Deprecated
public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
int vlength, String visExpression) throws IOException {
@@ -100,4 +101,36 @@ public class CellCreator {
return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags);
}
+
+ /**
+ * @param row row key
+ * @param roffset row offset
+ * @param rlength row length
+ * @param family family name
+ * @param foffset family offset
+ * @param flength family length
+ * @param qualifier column qualifier
+ * @param qoffset qualifier offset
+ * @param qlength qualifier length
+ * @param timestamp version timestamp
+ * @param value column value
+ * @param voffset value offset
+ * @param vlength value length
+ * @param tags
+ * @return created Cell
+ * @throws IOException
+ */
+ public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
+ byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
+ int vlength, List<Tag> tags) throws IOException {
+ return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
+ qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags);
+ }
+
+ /**
+ * @return Visibility expression resolver
+ */
+ public VisibilityExpressionResolver getVisibilityExpressionResolver() {
+ return this.visExpResolver;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
index 7ae6599..54e0034 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
@@ -125,13 +125,20 @@ public class ImportTsv extends Configured implements Tool {
public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY";
+ public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL";
+
private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1;
+ public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1;
+
private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+
+ private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX;
+
/**
* @param columnsSpecification the list of columns to parser out, comma separated.
* The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
@@ -162,14 +169,18 @@ public class ImportTsv extends Configured implements Tool {
timestampKeyColumnIndex = i;
continue;
}
- if(ATTRIBUTES_COLUMN_SPEC.equals(str)) {
+ if (ATTRIBUTES_COLUMN_SPEC.equals(str)) {
attrKeyColumnIndex = i;
continue;
}
- if(CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
+ if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
cellVisibilityColumnIndex = i;
continue;
}
+ if (CELL_TTL_COLUMN_SPEC.equals(str)) {
+ cellTTLColumnIndex = i;
+ continue;
+ }
String[] parts = str.split(":", 2);
if (parts.length == 1) {
families[i] = str.getBytes();
@@ -197,6 +208,10 @@ public class ImportTsv extends Configured implements Tool {
return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
}
+ public boolean hasCellTTL() {
+ return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+ }
+
public int getAttributesKeyColumnIndex() {
return attrKeyColumnIndex;
}
@@ -204,9 +219,15 @@ public class ImportTsv extends Configured implements Tool {
public int getCellVisibilityColumnIndex() {
return cellVisibilityColumnIndex;
}
+
+ public int getCellTTLColumnIndex() {
+ return cellTTLColumnIndex;
+ }
+
public int getRowKeyColumnIndex() {
return rowKeyColumnIndex;
}
+
public byte[] getFamily(int idx) {
return families[idx];
}
@@ -238,8 +259,10 @@ public class ImportTsv extends Configured implements Tool {
throw new BadTsvLineException("No timestamp");
} else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
throw new BadTsvLineException("No attributes specified");
- } else if(hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
+ } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
throw new BadTsvLineException("No cell visibility specified");
+ } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) {
+ throw new BadTsvLineException("No cell TTL specified");
}
return new ParsedLine(tabOffsets, lineBytes);
}
@@ -336,6 +359,31 @@ public class ImportTsv extends Configured implements Tool {
}
}
+ public int getCellTTLColumnOffset() {
+ if (hasCellTTL()) {
+ return getColumnOffset(cellTTLColumnIndex);
+ } else {
+ return DEFAULT_CELL_TTL_COLUMN_INDEX;
+ }
+ }
+
+ public int getCellTTLColumnLength() {
+ if (hasCellTTL()) {
+ return getColumnLength(cellTTLColumnIndex);
+ } else {
+ return DEFAULT_CELL_TTL_COLUMN_INDEX;
+ }
+ }
+
+ public long getCellTTL() {
+ if (!hasCellTTL()) {
+ return 0;
+ } else {
+ return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex),
+ getColumnLength(cellTTLColumnIndex));
+ }
+ }
+
public int getColumnOffset(int idx) {
if (idx > 0)
return tabOffsets.get(idx - 1) + 1;
@@ -535,6 +583,7 @@ public class ImportTsv extends Configured implements Tool {
if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
|| TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn)
|| TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn)
+ || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn)
|| TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn))
continue;
// we are only concerned with the first one (in case this is a cf:cq)
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
index 4a0e0fd..b3981a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import java.util.TreeSet;
@@ -28,8 +30,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
@@ -62,6 +67,9 @@ public class TextSortReducer extends
/** Cell visibility expr **/
private String cellVisibilityExpr;
+ /** Cell TTL */
+ private long ttl;
+
private CellCreator kvCreator;
public long getTs() {
@@ -148,18 +156,30 @@ public class TextSortReducer extends
// Retrieve timestamp if exists
ts = parsed.getTimestamp(ts);
cellVisibilityExpr = parsed.getCellVisibility();
+ ttl = parsed.getCellTTL();
for (int i = 0; i < parsed.getColumnCount(); i++) {
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
- || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
+ || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
+ || i == parser.getCellTTLColumnIndex()) {
continue;
}
// Creating the KV which needs to be directly written to HFiles. Using the Facade
// KVCreator for creation of kvs.
+ List<Tag> tags = new ArrayList<Tag>();
+ if (cellVisibilityExpr != null) {
+ tags.addAll(kvCreator.getVisibilityExpressionResolver()
+ .createVisibilityExpTags(cellVisibilityExpr));
+ }
+ // Add TTL directly to the KV so we can vary them when packing more than one KV
+ // into puts
+ if (ttl > 0) {
+ tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
+ }
Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(),
parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
- parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr);
+ parsed.getColumnOffset(i), parsed.getColumnLength(i), tags);
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
kvs.add(kv);
curSize += kv.heapSize();
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
index ff84081..270de75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
@@ -18,17 +18,22 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
@@ -59,6 +64,8 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
protected String cellVisibilityExpr;
+ protected long ttl;
+
protected CellCreator kvCreator;
private String hfileOutPath;
@@ -144,11 +151,13 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
// Retrieve timestamp if exists
ts = parsed.getTimestamp(ts);
cellVisibilityExpr = parsed.getCellVisibility();
+ ttl = parsed.getCellTTL();
Put put = new Put(rowKey.copyBytes());
for (int i = 0; i < parsed.getColumnCount(); i++) {
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
- || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
+ || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
+ || i == parser.getCellTTLColumnIndex()) {
continue;
}
populatePut(lineBytes, parsed, put, i);
@@ -192,13 +201,26 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
// the validation
put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
}
+ if (ttl > 0) {
+ put.setTTL(ttl);
+ }
} else {
// Creating the KV which needs to be directly written to HFiles. Using the Facade
// KVCreator for creation of kvs.
+ List<Tag> tags = new ArrayList<Tag>();
+ if (cellVisibilityExpr != null) {
+ tags.addAll(kvCreator.getVisibilityExpressionResolver()
+ .createVisibilityExpTags(cellVisibilityExpr));
+ }
+ // Add TTL directly to the KV so we can vary them when packing more than one KV
+ // into puts
+ if (ttl > 0) {
+ tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
+ }
cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
- parsed.getColumnLength(i), cellVisibilityExpr);
+ parsed.getColumnLength(i), tags);
}
put.add(cell);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
index fce3b29..4d22c0e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes;
/**
* State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}.
- * Like {@link ScanDeleteTracker} and {@link ScanDeleteTracker} but does not
+ * Like {@link ScanQueryMatcher} and {@link ScanDeleteTracker} but does not
* implement the {@link DeleteTracker} interface since state spans rows (There
* is no update nor reset method).
*/
@@ -42,7 +42,8 @@ import org.apache.hadoop.hbase.util.Bytes;
class GetClosestRowBeforeTracker {
private final KeyValue targetkey;
// Any cell w/ a ts older than this is expired.
- private final long oldestts;
+ private final long now;
+ private final long oldestUnexpiredTs;
private Cell candidate = null;
private final KVComparator kvcomparator;
// Flag for whether we're doing getclosest on a metaregion.
@@ -75,19 +76,12 @@ class GetClosestRowBeforeTracker {
HConstants.DELIMITER) - this.rowoffset;
}
this.tablenamePlusDelimiterLength = metaregion? l + 1: -1;
- this.oldestts = System.currentTimeMillis() - ttl;
+ this.now = System.currentTimeMillis();
+ this.oldestUnexpiredTs = now - ttl;
this.kvcomparator = c;
this.deletes = new TreeMap<Cell, NavigableSet<Cell>>(new CellComparator.RowComparator());
}
- /**
- * @param kv
- * @return True if this <code>kv</code> is expired.
- */
- boolean isExpired(final Cell kv) {
- return HStore.isExpired(kv, this.oldestts);
- }
-
/*
* Add the specified KeyValue to the list of deletes.
* @param kv
@@ -172,6 +166,15 @@ class GetClosestRowBeforeTracker {
return false;
}
+ /**
+ * @param cell
+ * @return true if the cell is expired
+ */
+ public boolean isExpired(final Cell cell) {
+ return cell.getTimestamp() < this.oldestUnexpiredTs ||
+ HStore.isCellTTLExpired(cell, this.oldestUnexpiredTs, this.now);
+ }
+
/*
* Handle keys whose values hold deletes.
* Add to the set of deletes and then if the candidate keys contain any that
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 1007a88..f7243d5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -85,6 +85,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append;
@@ -2642,6 +2644,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
noOfDeletes++;
}
+ rewriteCellTags(familyMaps[i], mutation);
}
lock(this.updatesLock.readLock(), numReadyToWrite);
@@ -3116,6 +3119,59 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
+ /**
+ * Possibly rewrite incoming cell tags.
+ */
+ void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
+ // Check if we have any work to do and early out otherwise
+ // Update these checks as more logic is added here
+
+ if (m.getTTL() == Long.MAX_VALUE) {
+ return;
+ }
+
+ // From this point we know we have some work to do
+
+ for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
+ List<Cell> cells = e.getValue();
+ assert cells instanceof RandomAccess;
+ int listSize = cells.size();
+ for (int i = 0; i < listSize; i++) {
+ Cell cell = cells.get(i);
+ List<Tag> newTags = new ArrayList<Tag>();
+ Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
+ cell.getTagsOffset(), cell.getTagsLength());
+
+ // Carry forward existing tags
+
+ while (tagIterator.hasNext()) {
+
+ // Add any filters or tag specific rewrites here
+
+ newTags.add(tagIterator.next());
+ }
+
+ // Cell TTL handling
+
+ // Check again if we need to add a cell TTL because early out logic
+ // above may change when there are more tag based features in core.
+ if (m.getTTL() != Long.MAX_VALUE) {
+ // Add a cell TTL tag
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
+ }
+
+ // Rewrite the cell with the updated set of tags
+
+ cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+ cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+ cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
+ cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ newTags));
+ }
+ }
+ }
+
/*
* Check if resources to support an update.
*
@@ -5213,6 +5269,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
processor.preBatchMutate(this, walEdit);
// 7. Apply to memstore
for (Mutation m : mutations) {
+ // Handle any tag based cell features
+ rewriteCellTags(m.getFamilyCellMap(), m);
+
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
Cell cell = cellScanner.current();
CellUtil.setSequenceId(cell, mvccNum);
@@ -5351,8 +5410,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
- // TODO: There's a lot of boiler plate code identical
- // to increment... See how to better unify that.
+ // TODO: There's a lot of boiler plate code identical to increment.
+ // We should refactor append and increment as local get-mutate-put
+ // transactions, so all stores only go through one code path for puts.
/**
* Perform one or more append operations on a row.
*
@@ -5422,8 +5482,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the append value
- // Avoid as much copying as possible. Every byte is copied at most
- // once.
+ // Avoid as much copying as possible. We may need to rewrite and
+ // consolidate tags. Bytes are only copied once.
// Would be nice if KeyValue had scatter/gather logic
int idx = 0;
for (Cell cell : family.getValue()) {
@@ -5433,40 +5493,87 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
&& CellUtil.matchingQualifier(results.get(idx), cell)) {
oldCell = results.get(idx);
long ts = Math.max(now, oldCell.getTimestamp());
- // allocate an empty kv once
+
+ // Process cell tags
+ List<Tag> newTags = new ArrayList<Tag>();
+
+ // Make a union of the set of tags in the old and new KVs
+
+ if (oldCell.getTagsLength() > 0) {
+ Iterator<Tag> i = CellUtil.tagsIterator(oldCell.getTagsArray(),
+ oldCell.getTagsOffset(), oldCell.getTagsLength());
+ while (i.hasNext()) {
+ newTags.add(i.next());
+ }
+ }
+ if (cell.getTagsLength() > 0) {
+ Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ while (i.hasNext()) {
+ newTags.add(i.next());
+ }
+ }
+
+ // Cell TTL handling
+
+ if (append.getTTL() != Long.MAX_VALUE) {
+ // Add the new TTL tag
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+ }
+
+ // Rebuild tags
+ byte[] tagBytes = Tag.fromList(newTags);
+
+ // allocate an empty cell once
newCell = new KeyValue(row.length, cell.getFamilyLength(),
cell.getQualifierLength(), ts, KeyValue.Type.Put,
oldCell.getValueLength() + cell.getValueLength(),
- oldCell.getTagsLength() + cell.getTagsLength());
- // copy in the value
- System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
- newCell.getValueArray(), newCell.getValueOffset(),
- oldCell.getValueLength());
- System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
- newCell.getValueArray(),
- newCell.getValueOffset() + oldCell.getValueLength(),
- cell.getValueLength());
- // copy in the tags
- System.arraycopy(oldCell.getTagsArray(), oldCell.getTagsOffset(),
- newCell.getTagsArray(), newCell.getTagsOffset(), oldCell.getTagsLength());
- System.arraycopy(cell.getTagsArray(), cell.getTagsOffset(), newCell.getTagsArray(),
- newCell.getTagsOffset() + oldCell.getTagsLength(), cell.getTagsLength());
+ tagBytes.length);
// copy in row, family, and qualifier
System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
- newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
+ newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
- newCell.getFamilyArray(), newCell.getFamilyOffset(),
- cell.getFamilyLength());
+ newCell.getFamilyArray(), newCell.getFamilyOffset(),
+ cell.getFamilyLength());
System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
- newCell.getQualifierArray(), newCell.getQualifierOffset(),
- cell.getQualifierLength());
+ newCell.getQualifierArray(), newCell.getQualifierOffset(),
+ cell.getQualifierLength());
+ // copy in the value
+ System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
+ newCell.getValueArray(), newCell.getValueOffset(),
+ oldCell.getValueLength());
+ System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
+ newCell.getValueArray(),
+ newCell.getValueOffset() + oldCell.getValueLength(),
+ cell.getValueLength());
+ // Copy in tag data
+ System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
+ tagBytes.length);
idx++;
} else {
- // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
- // so only need to update the timestamp to 'now'
+ // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP
CellUtil.updateLatestStamp(cell, now);
- newCell = cell;
- }
+
+ // Cell TTL handling
+
+ if (append.getTTL() != Long.MAX_VALUE) {
+ List<Tag> newTags = new ArrayList<Tag>(1);
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+ // Add the new TTL tag
+ newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
+ cell.getRowLength(),
+ cell.getFamilyArray(), cell.getFamilyOffset(),
+ cell.getFamilyLength(),
+ cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength(),
+ cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
+ cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ newTags);
+ } else {
+ newCell = cell;
+ }
+ }
+
CellUtil.setSequenceId(newCell, mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
@@ -5570,6 +5677,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
+ // TODO: There's a lot of boiler plate code identical to append.
+ // We should refactor append and increment as local get-mutate-put
+ // transactions, so all stores only go through one code path for puts.
/**
* Perform one or more increment operations on a row.
* @return new keyvalues after increment
@@ -5642,13 +5752,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the increment amount
int idx = 0;
- for (Cell kv: family.getValue()) {
- long amount = Bytes.toLong(CellUtil.cloneValue(kv));
+ for (Cell cell: family.getValue()) {
+ long amount = Bytes.toLong(CellUtil.cloneValue(cell));
boolean noWriteBack = (amount == 0);
+ List<Tag> newTags = new ArrayList<Tag>();
+
+ // Carry forward any tags that might have been added by a coprocessor
+ if (cell.getTagsLength() > 0) {
+ Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(),
+ cell.getTagsOffset(), cell.getTagsLength());
+ while (i.hasNext()) {
+ newTags.add(i.next());
+ }
+ }
Cell c = null;
long ts = now;
- if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
+ if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
c = results.get(idx);
ts = Math.max(now, c.getTimestamp());
if(c.getValueLength() == Bytes.SIZEOF_LONG) {
@@ -5658,32 +5778,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
throw new org.apache.hadoop.hbase.DoNotRetryIOException(
"Attempted to increment field that isn't 64 bits wide");
}
+ // Carry tags forward from previous version
+ if (c.getTagsLength() > 0) {
+ Iterator<Tag> i = CellUtil.tagsIterator(c.getTagsArray(),
+ c.getTagsOffset(), c.getTagsLength());
+ while (i.hasNext()) {
+ newTags.add(i.next());
+ }
+ }
idx++;
}
// Append new incremented KeyValue to list
- byte[] q = CellUtil.cloneQualifier(kv);
+ byte[] q = CellUtil.cloneQualifier(cell);
byte[] val = Bytes.toBytes(amount);
- int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
- int incCellTagsLen = kv.getTagsLength();
- Cell newKV = new KeyValue(row.length, family.getKey().length, q.length, ts,
- KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
- System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length);
- System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(),
- family.getKey().length);
- System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length);
- // copy in the value
- System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length);
- // copy tags
- if (oldCellTagsLen > 0) {
- System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(),
- newKV.getTagsOffset(), oldCellTagsLen);
- }
- if (incCellTagsLen > 0) {
- System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
- newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
+
+ // Add the TTL tag if the mutation carried one
+ if (increment.getTTL() != Long.MAX_VALUE) {
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
}
+
+ Cell newKV = new KeyValue(row, 0, row.length,
+ family.getKey(), 0, family.getKey().length,
+ q, 0, q.length,
+ ts,
+ KeyValue.Type.Put,
+ val, 0, val.length,
+ newTags);
+
CellUtil.setSequenceId(newKV, mvccNum);
+
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = coprocessorHost.postMutationBeforeWAL(
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 3811142..98b79fa 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
@@ -1674,8 +1676,38 @@ public class HStore implements Store {
return wantedVersions > maxVersions ? maxVersions: wantedVersions;
}
- static boolean isExpired(final Cell key, final long oldestTimestamp) {
- return key.getTimestamp() < oldestTimestamp;
+ /**
+ * @param cell
+ * @param oldestTimestamp
+ * @return true if the cell is expired
+ */
+ static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
+ // Do not create an Iterator or Tag objects unless the cell actually has
+ // tags
+ if (cell.getTagsLength() > 0) {
+ // Look for a TTL tag first. Use it instead of the family setting if
+ // found. If a cell has multiple TTLs, resolve the conflict by using the
+ // first tag encountered.
+ Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ if (TagType.TTL_TAG_TYPE == t.getType()) {
+ // Unlike in schema cell TTLs are stored in milliseconds, no need
+ // to convert
+ long ts = cell.getTimestamp();
+ assert t.getTagLength() == Bytes.SIZEOF_LONG;
+ long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
+ if (ts + ttl < now) {
+ return true;
+ }
+ // Per cell TTLs cannot extend lifetime beyond family settings, so
+ // fall through to check that
+ break;
+ }
+ }
+ }
+ return false;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
index 56e6b8a..c46da2a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
@@ -102,6 +102,10 @@ public class ScanQueryMatcher {
private final long earliestPutTs;
private final long ttl;
+ /** The oldest timestamp we are interested in, based on TTL */
+ private final long oldestUnexpiredTS;
+ private final long now;
+
/** readPoint over which the KVs are unconditionally included */
protected long maxReadPointToTrackVersions;
@@ -154,7 +158,7 @@ public class ScanQueryMatcher {
*/
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
- RegionCoprocessorHost regionCoprocessorHost) throws IOException {
+ long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this.tr = scan.getTimeRange();
this.rowComparator = scanInfo.getComparator();
this.regionCoprocessorHost = regionCoprocessorHost;
@@ -164,6 +168,9 @@ public class ScanQueryMatcher {
scanInfo.getFamily());
this.filter = scan.getFilter();
this.earliestPutTs = earliestPutTs;
+ this.oldestUnexpiredTS = oldestUnexpiredTS;
+ this.now = now;
+
this.maxReadPointToTrackVersions = readPointToUse;
this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
this.ttl = oldestUnexpiredTS;
@@ -218,18 +225,18 @@ public class ScanQueryMatcher {
* @param scanInfo The store's immutable scan info
* @param columns
* @param earliestPutTs Earliest put seen in any of the store files.
- * @param oldestUnexpiredTS the oldest timestamp we are interested in,
- * based on TTL
+ * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
+ * @param now the current server time
* @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
* @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
* @param regionCoprocessorHost
* @throws IOException
*/
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
- long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, byte[] dropDeletesFromRow,
+ long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
- oldestUnexpiredTS, regionCoprocessorHost);
+ oldestUnexpiredTS, now, regionCoprocessorHost);
Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
this.dropDeletesFromRow = dropDeletesFromRow;
this.dropDeletesToRow = dropDeletesToRow;
@@ -239,10 +246,10 @@ public class ScanQueryMatcher {
* Constructor for tests
*/
ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
- NavigableSet<byte[]> columns, long oldestUnexpiredTS) throws IOException {
+ NavigableSet<byte[]> columns, long oldestUnexpiredTS, long now) throws IOException {
this(scan, scanInfo, columns, ScanType.USER_SCAN,
Long.MAX_VALUE, /* max Readpoint to track versions */
- HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, null);
+ HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null);
}
/**
@@ -300,12 +307,17 @@ public class ScanQueryMatcher {
int qualifierOffset = cell.getQualifierOffset();
int qualifierLength = cell.getQualifierLength();
+
long timestamp = cell.getTimestamp();
// check for early out based on timestamp alone
if (columns.isDone(timestamp)) {
return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset,
qualifierLength);
}
+ // check if the cell is expired by cell TTL
+ if (HStore.isCellTTLExpired(cell, this.oldestUnexpiredTS, this.now)) {
+ return MatchCode.SKIP;
+ }
/*
* The delete logic is pretty complicated now.
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index ab7370c..9db116e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -77,6 +77,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected final Scan scan;
protected final NavigableSet<byte[]> columns;
protected final long oldestUnexpiredTS;
+ protected final long now;
protected final int minVersions;
protected final long maxRowSize;
@@ -123,7 +124,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
explicitColumnQuery = numCol > 0;
this.scan = scan;
this.columns = columns;
- oldestUnexpiredTS = EnvironmentEdgeManager.currentTime() - ttl;
+ this.now = EnvironmentEdgeManager.currentTime();
+ this.oldestUnexpiredTS = now - ttl;
this.minVersions = minVersions;
if (store != null && ((HStore)store).getHRegion() != null
@@ -176,7 +178,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
matcher = new ScanQueryMatcher(scan, scanInfo, columns,
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
- oldestUnexpiredTS, store.getCoprocessorHost());
+ oldestUnexpiredTS, now, store.getCoprocessorHost());
this.store.addChangedReaderObserver(this);
@@ -241,10 +243,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
if (dropDeletesFromRow == null) {
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
- earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost());
+ earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
} else {
matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
- oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
+ oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
}
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
@@ -284,7 +286,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions(), readPt);
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
- Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null);
+ Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
// In unit tests, the store could be null
if (this.store != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
new file mode 100644
index 0000000..a5cceb0
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.MapReduceTests;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({MapReduceTests.class, LargeTests.class})
+public class TestImportTSVWithTTLs implements Configurable {
+
+ protected static final Log LOG = LogFactory.getLog(TestImportTSVWithTTLs.class);
+ protected static final String NAME = TestImportTsv.class.getSimpleName();
+ protected static HBaseTestingUtility util = new HBaseTestingUtility();
+
+ /**
+ * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
+ * false.
+ */
+ protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
+
+ /**
+ * Force use of combiner in doMROnTableTest. Boolean. Default is true.
+ */
+ protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
+
+ private final String FAMILY = "FAM";
+ private static Configuration conf;
+
+ @Override
+ public Configuration getConf() {
+ return util.getConfiguration();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ throw new IllegalArgumentException("setConf not supported");
+ }
+
+ @BeforeClass
+ public static void provisionCluster() throws Exception {
+ conf = util.getConfiguration();
+ // We don't check persistence in HFiles in this test, but if we ever do we will
+ // need this where the default hfile version is not 3 (i.e. 0.98)
+ conf.setInt("hfile.format.version", 3);
+ conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName());
+ util.startMiniCluster();
+ util.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void releaseCluster() throws Exception {
+ util.shutdownMiniMapReduceCluster();
+ util.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testMROnTable() throws Exception {
+ String tableName = "test-" + UUID.randomUUID();
+
+ // Prepare the arguments required for the test.
+ String[] args = new String[] {
+ "-D" + ImportTsv.MAPPER_CONF_KEY
+ + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
+ "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL",
+ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName };
+ String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n";
+ util.createTable(TableName.valueOf(tableName), FAMILY);
+ doMROnTableTest(util, FAMILY, data, args, 1);
+ util.deleteTable(tableName);
+ }
+
+ protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
+ String[] args, int valueMultiplier) throws Exception {
+ TableName table = TableName.valueOf(args[args.length - 1]);
+ Configuration conf = new Configuration(util.getConfiguration());
+
+ // populate input file
+ FileSystem fs = FileSystem.get(conf);
+ Path inputPath = fs.makeQualified(new Path(util
+ .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
+ FSDataOutputStream op = fs.create(inputPath, true);
+ op.write(Bytes.toBytes(data));
+ op.close();
+ LOG.debug(String.format("Wrote test data to file: %s", inputPath));
+
+ if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
+ LOG.debug("Forcing combiner.");
+ conf.setInt("mapreduce.map.combine.minspills", 1);
+ }
+
+ // run the import
+ List<String> argv = new ArrayList<String>(Arrays.asList(args));
+ argv.add(inputPath.toString());
+ Tool tool = new ImportTsv();
+ LOG.debug("Running ImportTsv with arguments: " + argv);
+ try {
+ // Job will fail if observer rejects entries without TTL
+ assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
+ } finally {
+ // Clean up
+ if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
+ LOG.debug("Deleting test subdirectory");
+ util.cleanupDataTestDirOnTestFS(table.getNameAsString());
+ }
+ }
+
+ return tool;
+ }
+
+ public static class TTLCheckingObserver extends BaseRegionObserver {
+
+ @Override
+ public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
+ Durability durability) throws IOException {
+ HRegion region = e.getEnvironment().getRegion();
+ if (!region.getRegionInfo().isMetaTable()
+ && !region.getRegionInfo().getTable().isSystemTable()) {
+ // The put carries the TTL attribute
+ if (put.getTTL() != Long.MAX_VALUE) {
+ return;
+ }
+ throw new IOException("Operation does not have TTL set");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index ffaa347..7c06eca 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -90,6 +91,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
@@ -111,6 +113,7 @@ import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -140,6 +143,7 @@ import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
@@ -5767,6 +5771,133 @@ public class TestHRegion {
}
}
+ @Test
+ public void testCellTTLs() throws IOException {
+ IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ final byte[] row = Bytes.toBytes("testRow");
+ final byte[] q1 = Bytes.toBytes("q1");
+ final byte[] q2 = Bytes.toBytes("q2");
+ final byte[] q3 = Bytes.toBytes("q3");
+ final byte[] q4 = Bytes.toBytes("q4");
+
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testCellTTLs"));
+ HColumnDescriptor hcd = new HColumnDescriptor(fam1);
+ hcd.setTimeToLive(10); // 10 seconds
+ htd.addFamily(hcd);
+
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
+
+ HRegion region = HRegion.createHRegion(new HRegionInfo(htd.getTableName(),
+ HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
+ TEST_UTIL.getDataTestDir(), conf, htd);
+ assertNotNull(region);
+ try {
+ long now = EnvironmentEdgeManager.currentTime();
+ // Add a cell that will expire in 5 seconds via cell TTL
+ region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
+ HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+ // TTL tags specify ts in milliseconds
+ new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+ // Add a cell that will expire after 10 seconds via family setting
+ region.put(new Put(row).add(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
+ // Add a cell that will expire in 15 seconds via cell TTL
+ region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
+ HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+ // TTL tags specify ts in milliseconds
+ new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+ // Add a cell that will expire in 20 seconds via family setting
+ region.put(new Put(row).add(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
+
+ // Flush so we are sure store scanning gets this right
+ region.flushcache();
+
+ // A query at time T+0 should return all cells
+ Result r = region.get(new Get(row));
+ assertNotNull(r.getValue(fam1, q1));
+ assertNotNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+5 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNotNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+10 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+15 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+20 seconds
+ edge.incrementTime(10000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNull(r.getValue(fam1, q3));
+ assertNull(r.getValue(fam1, q4));
+
+ // Fun with disappearing increments
+
+ // Start at 1
+ region.put(new Put(row).add(fam1, q1, Bytes.toBytes(1L)));
+ r = region.get(new Get(row));
+ byte[] val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 1L);
+
+ // Increment with a TTL of 5 seconds
+ Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
+ incr.setTTL(5000);
+ region.increment(incr); // 2
+
+ // New value should be 2
+ r = region.get(new Get(row));
+ val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 2L);
+
+ // Increment time to T+25 seconds
+ edge.incrementTime(5000);
+
+ // Value should be back to 1
+ r = region.get(new Get(row));
+ val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 1L);
+
+ // Increment time to T+30 seconds
+ edge.incrementTime(5000);
+
+ // Original value written at T+20 should be gone now via family TTL
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+
+ } finally {
+ HRegion.closeHRegion(region);
+ }
+ }
+
private static HRegion initHRegion(byte[] tableName, String callingMethod,
byte[]... families) throws IOException {
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
index 97c66e4..6476288 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
@@ -95,11 +95,12 @@ public class TestQueryMatcher extends HBaseTestCase {
}
- private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
- // 2,4,5
+ private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
+ long now = EnvironmentEdgeManager.currentTime();
+ // 2,4,5
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2),
- EnvironmentEdgeManager.currentTime() - ttl);
+ now - ttl, now);
List<KeyValue> memstore = new ArrayList<KeyValue>();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -182,9 +183,10 @@ public class TestQueryMatcher extends HBaseTestCase {
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.DONE);
+ long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
- EnvironmentEdgeManager.currentTime() - ttl);
+ now - ttl, now);
List<KeyValue> memstore = new ArrayList<KeyValue>();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -238,8 +240,8 @@ public class TestQueryMatcher extends HBaseTestCase {
long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm =
- new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
- rowComparator), get.getFamilyMap().get(fam2), now - testTTL);
+ new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
+ rowComparator), get.getFamilyMap().get(fam2), now - testTTL, now);
KeyValue [] kvs = new KeyValue[] {
new KeyValue(row1, fam2, col1, now-100, data),
@@ -294,7 +296,7 @@ public class TestQueryMatcher extends HBaseTestCase {
long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
- now - testTTL);
+ now - testTTL, now);
KeyValue [] kvs = new KeyValue[] {
new KeyValue(row1, fam2, col1, now-100, data),
@@ -353,7 +355,7 @@ public class TestQueryMatcher extends HBaseTestCase {
NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
- HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to, null);
+ HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, now, from, to, null);
List<ScanQueryMatcher.MatchCode> actual =
new ArrayList<ScanQueryMatcher.MatchCode>(rows.length);
byte[] prevRow = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
index daee137..c0dcee6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
-import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category({RegionServerTests.class, SmallTests.class})
@@ -36,7 +35,6 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
final static int VERSIONS = 2;
- @Test
public void testCheckColumn_Ok() throws IOException {
ScanWildcardColumnTracker tracker =
new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
@@ -70,7 +68,6 @@ public class TestScanWildcardColumnTracker extends HBaseTestCase {
}
}
- @Test
public void testCheckColumn_EnforceVersions() throws IOException {
ScanWildcardColumnTracker tracker =
new ScanWildcardColumnTracker(0, VERSIONS, Long.MIN_VALUE);
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
index b982977..eaea83e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
@@ -419,8 +419,13 @@ public class TestTags {
tags = TestCoprocessorForTags.tags;
assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(2, tags.size());
- assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
- assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+ // We cannot assume the ordering of tags
+ List<String> tagValues = new ArrayList<String>();
+ for (Tag tag: tags) {
+ tagValues.add(Bytes.toString(tag.getValue()));
+ }
+ assertTrue(tagValues.contains("tag1"));
+ assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
@@ -476,8 +481,13 @@ public class TestTags {
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = TestCoprocessorForTags.tags;
assertEquals(2, tags.size());
- assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
- assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+ // We cannot assume the ordering of tags
+ tagValues.clear();
+ for (Tag tag: tags) {
+ tagValues.add(Bytes.toString(tag.getValue()));
+ }
+ assertTrue(tagValues.contains("tag1"));
+ assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-shell/src/main/ruby/hbase/table.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/table.rb b/hbase-shell/src/main/ruby/hbase/table.rb
index 46aca19..d53a82e 100644
--- a/hbase-shell/src/main/ruby/hbase/table.rb
+++ b/hbase-shell/src/main/ruby/hbase/table.rb
@@ -141,17 +141,19 @@ EOF
set_attributes(p, attributes) if attributes
visibility = args[VISIBILITY]
set_cell_visibility(p, visibility) if visibility
+ ttl = args[TTL]
+ set_op_ttl(p, ttl) if ttl
end
#Case where attributes are specified without timestamp
if timestamp.kind_of?(Hash)
timestamp.each do |k, v|
- if v.kind_of?(Hash)
- set_attributes(p, v) if v
- end
- if v.kind_of?(String)
- set_cell_visibility(p, v) if v
- end
-
+ if k == 'ATTRIBUTES'
+ set_attributes(p, v)
+ elsif k == 'VISIBILITY'
+ set_cell_visibility(p, v)
+ elsif k == "TTL"
+ set_op_ttl(p, v)
+ end
end
timestamp = nil
end
@@ -219,6 +221,8 @@ EOF
visibility = args[VISIBILITY]
set_attributes(incr, attributes) if attributes
set_cell_visibility(incr, visibility) if visibility
+ ttl = args[TTL]
+ set_op_ttl(incr, ttl) if ttl
end
incr.addColumn(family, qualifier, value)
@table.increment(incr)
@@ -237,6 +241,8 @@ EOF
visibility = args[VISIBILITY]
set_attributes(append, attributes) if attributes
set_cell_visibility(append, visibility) if visibility
+ ttl = args[TTL]
+ set_op_ttl(append, ttl) if ttl
end
append.add(family, qualifier, value.to_s.to_java_bytes)
@table.append(append)
@@ -545,6 +551,10 @@ EOF
auths.to_java(:string)))
end
+ def set_op_ttl(op, ttl)
+ op.setTTL(ttl.to_java(:long))
+ end
+
#----------------------------
# Add general administration utilities to the shell
# each of the names below adds this method name to the table
http://git-wip-us.apache.org/repos/asf/hbase/blob/09cd3d7b/hbase-shell/src/test/ruby/hbase/table_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/table_test.rb b/hbase-shell/src/test/ruby/hbase/table_test.rb
index 7272229..fa2990d 100644
--- a/hbase-shell/src/test/ruby/hbase/table_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/table_test.rb
@@ -530,5 +530,18 @@ module Hbase
end
end
+ define_test "mutation with TTL should expire" do
+ @test_table.put('ttlTest', 'x:a', 'foo', { TTL => 1000 } )
+ begin
+ res = @test_table._get_internal('ttlTest', 'x:a')
+ assert_not_nil(res)
+ sleep 2
+ res = @test_table._get_internal('ttlTest', 'x:a')
+ assert_nil(res)
+ ensure
+ @test_table.delete('ttlTest', 'x:a')
+ end
+ end
+
end
end
[2/3] hbase git commit: HBASE-10560 Per cell TTLs
Posted by ap...@apache.org.
HBASE-10560 Per cell TTLs
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/004e977b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/004e977b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/004e977b
Branch: refs/heads/branch-1
Commit: 004e977ba0009fc8978d842eb65933a62fdf1173
Parents: 1bd27bf
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Dec 5 11:00:19 2014 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Dec 5 11:11:15 2014 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/Append.java | 5 +
.../org/apache/hadoop/hbase/client/Delete.java | 5 +
.../apache/hadoop/hbase/client/Increment.java | 5 +
.../apache/hadoop/hbase/client/Mutation.java | 34 +++
.../org/apache/hadoop/hbase/client/Put.java | 5 +
.../java/org/apache/hadoop/hbase/TagType.java | 1 +
.../hadoop/hbase/mapreduce/CellCreator.java | 35 ++-
.../hadoop/hbase/mapreduce/ImportTsv.java | 55 ++++-
.../hadoop/hbase/mapreduce/TextSortReducer.java | 24 +-
.../hbase/mapreduce/TsvImporterMapper.java | 26 ++-
.../GetClosestRowBeforeTracker.java | 25 ++-
.../hadoop/hbase/regionserver/HRegion.java | 224 ++++++++++++++-----
.../hadoop/hbase/regionserver/HStore.java | 36 ++-
.../hbase/regionserver/ScanQueryMatcher.java | 26 ++-
.../hadoop/hbase/regionserver/StoreScanner.java | 12 +-
.../hbase/mapreduce/TestImportTSVWithTTLs.java | 172 ++++++++++++++
.../hadoop/hbase/regionserver/TestHRegion.java | 131 +++++++++++
.../hbase/regionserver/TestQueryMatcher.java | 18 +-
.../hadoop/hbase/regionserver/TestTags.java | 18 +-
hbase-shell/src/main/ruby/hbase/table.rb | 24 +-
hbase-shell/src/test/ruby/hbase/table_test.rb | 13 ++
21 files changed, 792 insertions(+), 102 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
index 66e8d32..58c204b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Append.java
@@ -183,4 +183,9 @@ public class Append extends Mutation {
public Append setACL(Map<String, Permission> perms) {
return (Append) super.setACL(perms);
}
+
+ @Override
+ public Append setTTL(long ttl) {
+ return (Append) super.setTTL(ttl);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
index 7447147..4bbcb27 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
@@ -473,4 +473,9 @@ public class Delete extends Mutation implements Comparable<Row> {
public Delete setACL(Map<String, Permission> perms) {
return (Delete) super.setACL(perms);
}
+
+ @Override
+ public Delete setTTL(long ttl) {
+ throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
index e2d8387..af0ea56 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Increment.java
@@ -329,4 +329,9 @@ public class Increment extends Mutation implements Comparable<Row> {
public Increment setACL(Map<String, Permission> perms) {
return (Increment) super.setACL(perms);
}
+
+ @Override
+ public Increment setTTL(long ttl) {
+ return (Increment) super.setTTL(ttl);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index 1ebff93..dbc1317 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -78,6 +78,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
*/
private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
+ /**
+ * The attribute for storing TTL for the result of the mutation.
+ */
+ private static final String OP_ATTRIBUTE_TTL = "_ttl";
+
protected byte [] row = null;
protected long ts = HConstants.LATEST_TIMESTAMP;
protected Durability durability = Durability.USE_DEFAULT;
@@ -201,6 +206,12 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
if (getId() != null) {
map.put("id", getId());
}
+ // Add the TTL if set
+ // Long.MAX_VALUE is the default, and is interpreted to mean this attribute
+ // has not been set.
+ if (getTTL() != Long.MAX_VALUE) {
+ map.put("ttl", getTTL());
+ }
return map;
}
@@ -474,6 +485,29 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
}
/**
+ * Return the TTL requested for the result of the mutation, in milliseconds.
+ * @return the TTL requested for the result of the mutation, in milliseconds,
+ * or Long.MAX_VALUE if unset
+ */
+ public long getTTL() {
+ byte[] ttlBytes = getAttribute(OP_ATTRIBUTE_TTL);
+ if (ttlBytes != null) {
+ return Bytes.toLong(ttlBytes);
+ }
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * Set the TTL desired for the result of the mutation, in milliseconds.
+ * @param ttl the TTL desired for the result of the mutation, in milliseconds
+ * @return this
+ */
+ public Mutation setTTL(long ttl) {
+ setAttribute(OP_ATTRIBUTE_TTL, Bytes.toBytes(ttl));
+ return this;
+ }
+
+ /**
* Subclasses should override this method to add the heap size of their own fields.
* @return the heap size to add (will be aligned).
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
index 20d445a..0fb0118 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Put.java
@@ -472,4 +472,9 @@ public class Put extends Mutation implements HeapSize, Comparable<Row> {
public Put setACL(Map<String, Permission> perms) {
return (Put) super.setACL(perms);
}
+
+ @Override
+ public Put setTTL(long ttl) {
+ return (Put) super.setTTL(ttl);
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index f088dcf..2095b7a 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -30,4 +30,5 @@ public final class TagType {
public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
// String based tag type used in replication
public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
+ public static final byte TTL_TAG_TYPE = (byte)8;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
index b3dfee7..001f64d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
@@ -69,7 +69,7 @@ public class CellCreator {
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
int vlength) throws IOException {
return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength,
- timestamp, value, voffset, vlength, null);
+ timestamp, value, voffset, vlength, (List<Tag>)null);
}
/**
@@ -90,6 +90,7 @@ public class CellCreator {
* @return created Cell
* @throws IOException
*/
+ @Deprecated
public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
int vlength, String visExpression) throws IOException {
@@ -100,4 +101,36 @@ public class CellCreator {
return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags);
}
+
+ /**
+ * @param row row key
+ * @param roffset row offset
+ * @param rlength row length
+ * @param family family name
+ * @param foffset family offset
+ * @param flength family length
+ * @param qualifier column qualifier
+ * @param qoffset qualifier offset
+ * @param qlength qualifier length
+ * @param timestamp version timestamp
+ * @param value column value
+ * @param voffset value offset
+ * @param vlength value length
+ * @param tags
+ * @return created Cell
+ * @throws IOException
+ */
+ public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
+ byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
+ int vlength, List<Tag> tags) throws IOException {
+ return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
+ qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags);
+ }
+
+ /**
+ * @return Visibility expression resolver
+ */
+ public VisibilityExpressionResolver getVisibilityExpressionResolver() {
+ return this.visExpResolver;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
index 7ae6599..54e0034 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
@@ -125,13 +125,20 @@ public class ImportTsv extends Configured implements Tool {
public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY";
+ public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL";
+
private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1;
+ public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1;
+
private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+
+ private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX;
+
/**
* @param columnsSpecification the list of columns to parser out, comma separated.
* The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
@@ -162,14 +169,18 @@ public class ImportTsv extends Configured implements Tool {
timestampKeyColumnIndex = i;
continue;
}
- if(ATTRIBUTES_COLUMN_SPEC.equals(str)) {
+ if (ATTRIBUTES_COLUMN_SPEC.equals(str)) {
attrKeyColumnIndex = i;
continue;
}
- if(CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
+ if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
cellVisibilityColumnIndex = i;
continue;
}
+ if (CELL_TTL_COLUMN_SPEC.equals(str)) {
+ cellTTLColumnIndex = i;
+ continue;
+ }
String[] parts = str.split(":", 2);
if (parts.length == 1) {
families[i] = str.getBytes();
@@ -197,6 +208,10 @@ public class ImportTsv extends Configured implements Tool {
return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
}
+ public boolean hasCellTTL() {
+ return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+ }
+
public int getAttributesKeyColumnIndex() {
return attrKeyColumnIndex;
}
@@ -204,9 +219,15 @@ public class ImportTsv extends Configured implements Tool {
public int getCellVisibilityColumnIndex() {
return cellVisibilityColumnIndex;
}
+
+ public int getCellTTLColumnIndex() {
+ return cellTTLColumnIndex;
+ }
+
public int getRowKeyColumnIndex() {
return rowKeyColumnIndex;
}
+
public byte[] getFamily(int idx) {
return families[idx];
}
@@ -238,8 +259,10 @@ public class ImportTsv extends Configured implements Tool {
throw new BadTsvLineException("No timestamp");
} else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
throw new BadTsvLineException("No attributes specified");
- } else if(hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
+ } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
throw new BadTsvLineException("No cell visibility specified");
+ } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) {
+ throw new BadTsvLineException("No cell TTL specified");
}
return new ParsedLine(tabOffsets, lineBytes);
}
@@ -336,6 +359,31 @@ public class ImportTsv extends Configured implements Tool {
}
}
+ public int getCellTTLColumnOffset() {
+ if (hasCellTTL()) {
+ return getColumnOffset(cellTTLColumnIndex);
+ } else {
+ return DEFAULT_CELL_TTL_COLUMN_INDEX;
+ }
+ }
+
+ public int getCellTTLColumnLength() {
+ if (hasCellTTL()) {
+ return getColumnLength(cellTTLColumnIndex);
+ } else {
+ return DEFAULT_CELL_TTL_COLUMN_INDEX;
+ }
+ }
+
+ public long getCellTTL() {
+ if (!hasCellTTL()) {
+ return 0;
+ } else {
+ return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex),
+ getColumnLength(cellTTLColumnIndex));
+ }
+ }
+
public int getColumnOffset(int idx) {
if (idx > 0)
return tabOffsets.get(idx - 1) + 1;
@@ -535,6 +583,7 @@ public class ImportTsv extends Configured implements Tool {
if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
|| TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn)
|| TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn)
+ || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn)
|| TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn))
continue;
// we are only concerned with the first one (in case this is a cf:cq)
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
index 4a0e0fd..b3981a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import java.util.TreeSet;
@@ -28,8 +30,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
@@ -62,6 +67,9 @@ public class TextSortReducer extends
/** Cell visibility expr **/
private String cellVisibilityExpr;
+ /** Cell TTL */
+ private long ttl;
+
private CellCreator kvCreator;
public long getTs() {
@@ -148,18 +156,30 @@ public class TextSortReducer extends
// Retrieve timestamp if exists
ts = parsed.getTimestamp(ts);
cellVisibilityExpr = parsed.getCellVisibility();
+ ttl = parsed.getCellTTL();
for (int i = 0; i < parsed.getColumnCount(); i++) {
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
- || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
+ || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
+ || i == parser.getCellTTLColumnIndex()) {
continue;
}
// Creating the KV which needs to be directly written to HFiles. Using the Facade
// KVCreator for creation of kvs.
+ List<Tag> tags = new ArrayList<Tag>();
+ if (cellVisibilityExpr != null) {
+ tags.addAll(kvCreator.getVisibilityExpressionResolver()
+ .createVisibilityExpTags(cellVisibilityExpr));
+ }
+ // Add TTL directly to the KV so we can vary them when packing more than one KV
+ // into puts
+ if (ttl > 0) {
+ tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
+ }
Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(),
parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
- parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr);
+ parsed.getColumnOffset(i), parsed.getColumnLength(i), tags);
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
kvs.add(kv);
curSize += kv.heapSize();
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
index ff84081..270de75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
@@ -18,17 +18,22 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
@@ -59,6 +64,8 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
protected String cellVisibilityExpr;
+ protected long ttl;
+
protected CellCreator kvCreator;
private String hfileOutPath;
@@ -144,11 +151,13 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
// Retrieve timestamp if exists
ts = parsed.getTimestamp(ts);
cellVisibilityExpr = parsed.getCellVisibility();
+ ttl = parsed.getCellTTL();
Put put = new Put(rowKey.copyBytes());
for (int i = 0; i < parsed.getColumnCount(); i++) {
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
- || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
+ || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
+ || i == parser.getCellTTLColumnIndex()) {
continue;
}
populatePut(lineBytes, parsed, put, i);
@@ -192,13 +201,26 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
// the validation
put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
}
+ if (ttl > 0) {
+ put.setTTL(ttl);
+ }
} else {
// Creating the KV which needs to be directly written to HFiles. Using the Facade
// KVCreator for creation of kvs.
+ List<Tag> tags = new ArrayList<Tag>();
+ if (cellVisibilityExpr != null) {
+ tags.addAll(kvCreator.getVisibilityExpressionResolver()
+ .createVisibilityExpTags(cellVisibilityExpr));
+ }
+ // Add TTL directly to the KV so we can vary them when packing more than one KV
+ // into puts
+ if (ttl > 0) {
+ tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
+ }
cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
- parsed.getColumnLength(i), cellVisibilityExpr);
+ parsed.getColumnLength(i), tags);
}
put.add(cell);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
index 2f9b0be..ae41844 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.util.Bytes;
/**
* State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}.
- * Like {@link ScanDeleteTracker} and {@link ScanDeleteTracker} but does not
+ * Like {@link ScanQueryMatcher} and {@link ScanDeleteTracker} but does not
* implement the {@link DeleteTracker} interface since state spans rows (There
* is no update nor reset method).
*/
@@ -42,7 +42,8 @@ import org.apache.hadoop.hbase.util.Bytes;
class GetClosestRowBeforeTracker {
private final KeyValue targetkey;
// Any cell w/ a ts older than this is expired.
- private final long oldestts;
+ private final long now;
+ private final long oldestUnexpiredTs;
private Cell candidate = null;
private final KVComparator kvcomparator;
// Flag for whether we're doing getclosest on a metaregion.
@@ -75,20 +76,13 @@ class GetClosestRowBeforeTracker {
HConstants.DELIMITER) - this.rowoffset;
}
this.tablenamePlusDelimiterLength = metaregion? l + 1: -1;
- this.oldestts = System.currentTimeMillis() - ttl;
+ this.now = System.currentTimeMillis();
+ this.oldestUnexpiredTs = now - ttl;
this.kvcomparator = c;
KeyValue.RowOnlyComparator rc = new KeyValue.RowOnlyComparator(this.kvcomparator);
this.deletes = new TreeMap<KeyValue, NavigableSet<KeyValue>>(rc);
}
- /**
- * @param kv
- * @return True if this <code>kv</code> is expired.
- */
- boolean isExpired(final Cell kv) {
- return HStore.isExpired(kv, this.oldestts);
- }
-
/*
* Add the specified KeyValue to the list of deletes.
* @param kv
@@ -173,6 +167,15 @@ class GetClosestRowBeforeTracker {
return false;
}
+ /**
+ * @param cell
+ * @return true if the cell is expired
+ */
+ public boolean isExpired(final Cell cell) {
+ return cell.getTimestamp() < this.oldestUnexpiredTs ||
+ HStore.isCellTTLExpired(cell, this.oldestUnexpiredTs, this.now);
+ }
+
/*
* Handle keys whose values hold deletes.
* Add to the set of deletes and then if the candidate keys contain any that
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index ffc9cba..67cd70b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -85,6 +85,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append;
@@ -2644,6 +2646,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
prepareDeleteTimestamps(mutation, familyMaps[i], byteNow);
noOfDeletes++;
}
+ rewriteCellTags(familyMaps[i], mutation);
}
lock(this.updatesLock.readLock(), numReadyToWrite);
@@ -3118,6 +3121,59 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
}
}
+ /**
+ * Possibly rewrite incoming cell tags.
+ */
+ void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
+ // Check if we have any work to do and early out otherwise
+ // Update these checks as more logic is added here
+
+ if (m.getTTL() == Long.MAX_VALUE) {
+ return;
+ }
+
+ // From this point we know we have some work to do
+
+ for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
+ List<Cell> cells = e.getValue();
+ assert cells instanceof RandomAccess;
+ int listSize = cells.size();
+ for (int i = 0; i < listSize; i++) {
+ Cell cell = cells.get(i);
+ List<Tag> newTags = new ArrayList<Tag>();
+ Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
+ cell.getTagsOffset(), cell.getTagsLength());
+
+ // Carry forward existing tags
+
+ while (tagIterator.hasNext()) {
+
+ // Add any filters or tag specific rewrites here
+
+ newTags.add(tagIterator.next());
+ }
+
+ // Cell TTL handling
+
+ // Check again if we need to add a cell TTL because early out logic
+ // above may change when there are more tag based features in core.
+ if (m.getTTL() != Long.MAX_VALUE) {
+ // Add a cell TTL tag
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
+ }
+
+ // Rewrite the cell with the updated set of tags
+
+ cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+ cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+ cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
+ cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ newTags));
+ }
+ }
+ }
+
/*
* Check if resources to support an update.
*
@@ -5215,6 +5271,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
processor.preBatchMutate(this, walEdit);
// 7. Apply to memstore
for (Mutation m : mutations) {
+ // Handle any tag based cell features
+ rewriteCellTags(m.getFamilyCellMap(), m);
+
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
Cell cell = cellScanner.current();
CellUtil.setSequenceId(cell, mvccNum);
@@ -5353,8 +5412,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
- // TODO: There's a lot of boiler plate code identical
- // to increment... See how to better unify that.
+ // TODO: There's a lot of boiler plate code identical to increment.
+ // We should refactor append and increment as local get-mutate-put
+ // transactions, so all stores only go through one code path for puts.
/**
* Perform one or more append operations on a row.
*
@@ -5425,8 +5485,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the append value
- // Avoid as much copying as possible. Every byte is copied at most
- // once.
+ // Avoid as much copying as possible. We may need to rewrite and
+ // consolidate tags. Bytes are only copied once.
// Would be nice if KeyValue had scatter/gather logic
int idx = 0;
for (Cell cell : family.getValue()) {
@@ -5436,40 +5496,87 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
&& CellUtil.matchingQualifier(results.get(idx), cell)) {
oldCell = results.get(idx);
long ts = Math.max(now, oldCell.getTimestamp());
- // allocate an empty kv once
+
+ // Process cell tags
+ List<Tag> newTags = new ArrayList<Tag>();
+
+ // Make a union of the set of tags in the old and new KVs
+
+ if (oldCell.getTagsLength() > 0) {
+ Iterator<Tag> i = CellUtil.tagsIterator(oldCell.getTagsArray(),
+ oldCell.getTagsOffset(), oldCell.getTagsLength());
+ while (i.hasNext()) {
+ newTags.add(i.next());
+ }
+ }
+ if (cell.getTagsLength() > 0) {
+ Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ while (i.hasNext()) {
+ newTags.add(i.next());
+ }
+ }
+
+ // Cell TTL handling
+
+ if (append.getTTL() != Long.MAX_VALUE) {
+ // Add the new TTL tag
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+ }
+
+ // Rebuild tags
+ byte[] tagBytes = Tag.fromList(newTags);
+
+ // allocate an empty cell once
newCell = new KeyValue(row.length, cell.getFamilyLength(),
cell.getQualifierLength(), ts, KeyValue.Type.Put,
oldCell.getValueLength() + cell.getValueLength(),
- oldCell.getTagsLength() + cell.getTagsLength());
- // copy in the value
- System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
- newCell.getValueArray(), newCell.getValueOffset(),
- oldCell.getValueLength());
- System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
- newCell.getValueArray(),
- newCell.getValueOffset() + oldCell.getValueLength(),
- cell.getValueLength());
- // copy in the tags
- System.arraycopy(oldCell.getTagsArray(), oldCell.getTagsOffset(),
- newCell.getTagsArray(), newCell.getTagsOffset(), oldCell.getTagsLength());
- System.arraycopy(cell.getTagsArray(), cell.getTagsOffset(), newCell.getTagsArray(),
- newCell.getTagsOffset() + oldCell.getTagsLength(), cell.getTagsLength());
+ tagBytes.length);
// copy in row, family, and qualifier
System.arraycopy(cell.getRowArray(), cell.getRowOffset(),
- newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
+ newCell.getRowArray(), newCell.getRowOffset(), cell.getRowLength());
System.arraycopy(cell.getFamilyArray(), cell.getFamilyOffset(),
- newCell.getFamilyArray(), newCell.getFamilyOffset(),
- cell.getFamilyLength());
+ newCell.getFamilyArray(), newCell.getFamilyOffset(),
+ cell.getFamilyLength());
System.arraycopy(cell.getQualifierArray(), cell.getQualifierOffset(),
- newCell.getQualifierArray(), newCell.getQualifierOffset(),
- cell.getQualifierLength());
+ newCell.getQualifierArray(), newCell.getQualifierOffset(),
+ cell.getQualifierLength());
+ // copy in the value
+ System.arraycopy(oldCell.getValueArray(), oldCell.getValueOffset(),
+ newCell.getValueArray(), newCell.getValueOffset(),
+ oldCell.getValueLength());
+ System.arraycopy(cell.getValueArray(), cell.getValueOffset(),
+ newCell.getValueArray(),
+ newCell.getValueOffset() + oldCell.getValueLength(),
+ cell.getValueLength());
+ // Copy in tag data
+ System.arraycopy(tagBytes, 0, newCell.getTagsArray(), newCell.getTagsOffset(),
+ tagBytes.length);
idx++;
} else {
- // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
- // so only need to update the timestamp to 'now'
+ // Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP
CellUtil.updateLatestStamp(cell, now);
- newCell = cell;
- }
+
+ // Cell TTL handling
+
+ if (append.getTTL() != Long.MAX_VALUE) {
+ List<Tag> newTags = new ArrayList<Tag>(1);
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+ // Add the new TTL tag
+ newCell = new KeyValue(cell.getRowArray(), cell.getRowOffset(),
+ cell.getRowLength(),
+ cell.getFamilyArray(), cell.getFamilyOffset(),
+ cell.getFamilyLength(),
+ cell.getQualifierArray(), cell.getQualifierOffset(),
+ cell.getQualifierLength(),
+ cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
+ cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ newTags);
+ } else {
+ newCell = cell;
+ }
+ }
+
CellUtil.setSequenceId(newCell, mvccNum);
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
@@ -5573,6 +5680,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
+ // TODO: There's a lot of boiler plate code identical to append.
+ // We should refactor append and increment as local get-mutate-put
+ // transactions, so all stores only go through one code path for puts.
/**
* Perform one or more increment operations on a row.
* @return new keyvalues after increment
@@ -5645,13 +5755,23 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the increment amount
int idx = 0;
- for (Cell kv: family.getValue()) {
- long amount = Bytes.toLong(CellUtil.cloneValue(kv));
+ for (Cell cell: family.getValue()) {
+ long amount = Bytes.toLong(CellUtil.cloneValue(cell));
boolean noWriteBack = (amount == 0);
+ List<Tag> newTags = new ArrayList<Tag>();
+
+ // Carry forward any tags that might have been added by a coprocessor
+ if (cell.getTagsLength() > 0) {
+ Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(),
+ cell.getTagsOffset(), cell.getTagsLength());
+ while (i.hasNext()) {
+ newTags.add(i.next());
+ }
+ }
Cell c = null;
long ts = now;
- if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
+ if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
c = results.get(idx);
ts = Math.max(now, c.getTimestamp());
if(c.getValueLength() == Bytes.SIZEOF_LONG) {
@@ -5661,32 +5781,36 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver { //
throw new org.apache.hadoop.hbase.DoNotRetryIOException(
"Attempted to increment field that isn't 64 bits wide");
}
+ // Carry tags forward from previous version
+ if (c.getTagsLength() > 0) {
+ Iterator<Tag> i = CellUtil.tagsIterator(c.getTagsArray(),
+ c.getTagsOffset(), c.getTagsLength());
+ while (i.hasNext()) {
+ newTags.add(i.next());
+ }
+ }
idx++;
}
// Append new incremented KeyValue to list
- byte[] q = CellUtil.cloneQualifier(kv);
+ byte[] q = CellUtil.cloneQualifier(cell);
byte[] val = Bytes.toBytes(amount);
- int oldCellTagsLen = (c == null) ? 0 : c.getTagsLength();
- int incCellTagsLen = kv.getTagsLength();
- Cell newKV = new KeyValue(row.length, family.getKey().length, q.length, ts,
- KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
- System.arraycopy(row, 0, newKV.getRowArray(), newKV.getRowOffset(), row.length);
- System.arraycopy(family.getKey(), 0, newKV.getFamilyArray(), newKV.getFamilyOffset(),
- family.getKey().length);
- System.arraycopy(q, 0, newKV.getQualifierArray(), newKV.getQualifierOffset(), q.length);
- // copy in the value
- System.arraycopy(val, 0, newKV.getValueArray(), newKV.getValueOffset(), val.length);
- // copy tags
- if (oldCellTagsLen > 0) {
- System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getTagsArray(),
- newKV.getTagsOffset(), oldCellTagsLen);
- }
- if (incCellTagsLen > 0) {
- System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getTagsArray(),
- newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
+
+ // Add the TTL tag if the mutation carried one
+ if (increment.getTTL() != Long.MAX_VALUE) {
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
}
+
+ Cell newKV = new KeyValue(row, 0, row.length,
+ family.getKey(), 0, family.getKey().length,
+ q, 0, q.length,
+ ts,
+ KeyValue.Type.Put,
+ val, 0, val.length,
+ newTags);
+
CellUtil.setSequenceId(newKV, mvccNum);
+
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
newKV = coprocessorHost.postMutationBeforeWAL(
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 2adaaba..b3385d2 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -57,6 +57,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.conf.ConfigurationManager;
@@ -1673,8 +1675,38 @@ public class HStore implements Store {
return wantedVersions > maxVersions ? maxVersions: wantedVersions;
}
- static boolean isExpired(final Cell key, final long oldestTimestamp) {
- return key.getTimestamp() < oldestTimestamp;
+ /**
+ * @param cell
+ * @param oldestTimestamp
+ * @return true if the cell is expired
+ */
+ static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
+ // Do not create an Iterator or Tag objects unless the cell actually has
+ // tags
+ if (cell.getTagsLength() > 0) {
+ // Look for a TTL tag first. Use it instead of the family setting if
+ // found. If a cell has multiple TTLs, resolve the conflict by using the
+ // first tag encountered.
+ Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLength());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ if (TagType.TTL_TAG_TYPE == t.getType()) {
+ // Unlike in schema cell TTLs are stored in milliseconds, no need
+ // to convert
+ long ts = cell.getTimestamp();
+ assert t.getTagLength() == Bytes.SIZEOF_LONG;
+ long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
+ if (ts + ttl < now) {
+ return true;
+ }
+ // Per cell TTLs cannot extend lifetime beyond family settings, so
+ // fall through to check that
+ break;
+ }
+ }
+ }
+ return false;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
index 56e6b8a..c46da2a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
@@ -102,6 +102,10 @@ public class ScanQueryMatcher {
private final long earliestPutTs;
private final long ttl;
+ /** The oldest timestamp we are interested in, based on TTL */
+ private final long oldestUnexpiredTS;
+ private final long now;
+
/** readPoint over which the KVs are unconditionally included */
protected long maxReadPointToTrackVersions;
@@ -154,7 +158,7 @@ public class ScanQueryMatcher {
*/
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
- RegionCoprocessorHost regionCoprocessorHost) throws IOException {
+ long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this.tr = scan.getTimeRange();
this.rowComparator = scanInfo.getComparator();
this.regionCoprocessorHost = regionCoprocessorHost;
@@ -164,6 +168,9 @@ public class ScanQueryMatcher {
scanInfo.getFamily());
this.filter = scan.getFilter();
this.earliestPutTs = earliestPutTs;
+ this.oldestUnexpiredTS = oldestUnexpiredTS;
+ this.now = now;
+
this.maxReadPointToTrackVersions = readPointToUse;
this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
this.ttl = oldestUnexpiredTS;
@@ -218,18 +225,18 @@ public class ScanQueryMatcher {
* @param scanInfo The store's immutable scan info
* @param columns
* @param earliestPutTs Earliest put seen in any of the store files.
- * @param oldestUnexpiredTS the oldest timestamp we are interested in,
- * based on TTL
+ * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
+ * @param now the current server time
* @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
* @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
* @param regionCoprocessorHost
* @throws IOException
*/
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
- long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, byte[] dropDeletesFromRow,
+ long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
- oldestUnexpiredTS, regionCoprocessorHost);
+ oldestUnexpiredTS, now, regionCoprocessorHost);
Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
this.dropDeletesFromRow = dropDeletesFromRow;
this.dropDeletesToRow = dropDeletesToRow;
@@ -239,10 +246,10 @@ public class ScanQueryMatcher {
* Constructor for tests
*/
ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
- NavigableSet<byte[]> columns, long oldestUnexpiredTS) throws IOException {
+ NavigableSet<byte[]> columns, long oldestUnexpiredTS, long now) throws IOException {
this(scan, scanInfo, columns, ScanType.USER_SCAN,
Long.MAX_VALUE, /* max Readpoint to track versions */
- HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, null);
+ HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null);
}
/**
@@ -300,12 +307,17 @@ public class ScanQueryMatcher {
int qualifierOffset = cell.getQualifierOffset();
int qualifierLength = cell.getQualifierLength();
+
long timestamp = cell.getTimestamp();
// check for early out based on timestamp alone
if (columns.isDone(timestamp)) {
return columns.getNextRowOrNextColumn(cell.getQualifierArray(), qualifierOffset,
qualifierLength);
}
+ // check if the cell is expired by cell TTL
+ if (HStore.isCellTTLExpired(cell, this.oldestUnexpiredTS, this.now)) {
+ return MatchCode.SKIP;
+ }
/*
* The delete logic is pretty complicated now.
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index ab7370c..9db116e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -77,6 +77,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected final Scan scan;
protected final NavigableSet<byte[]> columns;
protected final long oldestUnexpiredTS;
+ protected final long now;
protected final int minVersions;
protected final long maxRowSize;
@@ -123,7 +124,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
explicitColumnQuery = numCol > 0;
this.scan = scan;
this.columns = columns;
- oldestUnexpiredTS = EnvironmentEdgeManager.currentTime() - ttl;
+ this.now = EnvironmentEdgeManager.currentTime();
+ this.oldestUnexpiredTS = now - ttl;
this.minVersions = minVersions;
if (store != null && ((HStore)store).getHRegion() != null
@@ -176,7 +178,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
matcher = new ScanQueryMatcher(scan, scanInfo, columns,
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
- oldestUnexpiredTS, store.getCoprocessorHost());
+ oldestUnexpiredTS, now, store.getCoprocessorHost());
this.store.addChangedReaderObserver(this);
@@ -241,10 +243,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
if (dropDeletesFromRow == null) {
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
- earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost());
+ earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
} else {
matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
- oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
+ oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
}
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
@@ -284,7 +286,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions(), readPt);
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
- Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null);
+ Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
// In unit tests, the store could be null
if (this.store != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
new file mode 100644
index 0000000..7ea69f2
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestImportTSVWithTTLs implements Configurable {
+
+ protected static final Log LOG = LogFactory.getLog(TestImportTSVWithTTLs.class);
+ protected static final String NAME = TestImportTsv.class.getSimpleName();
+ protected static HBaseTestingUtility util = new HBaseTestingUtility();
+
+ /**
+ * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
+ * false.
+ */
+ protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
+
+ /**
+ * Force use of combiner in doMROnTableTest. Boolean. Default is true.
+ */
+ protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
+
+ private final String FAMILY = "FAM";
+ private static Configuration conf;
+
+ @Override
+ public Configuration getConf() {
+ return util.getConfiguration();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ throw new IllegalArgumentException("setConf not supported");
+ }
+
+ @BeforeClass
+ public static void provisionCluster() throws Exception {
+ conf = util.getConfiguration();
+ // We don't check persistence in HFiles in this test, but if we ever do we will
+ // need this where the default hfile version is not 3 (i.e. 0.98)
+ conf.setInt("hfile.format.version", 3);
+ conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName());
+ util.startMiniCluster();
+ util.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void releaseCluster() throws Exception {
+ util.shutdownMiniMapReduceCluster();
+ util.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testMROnTable() throws Exception {
+ String tableName = "test-" + UUID.randomUUID();
+
+ // Prepare the arguments required for the test.
+ String[] args = new String[] {
+ "-D" + ImportTsv.MAPPER_CONF_KEY
+ + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
+ "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL",
+ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName };
+ String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n";
+ util.createTable(TableName.valueOf(tableName), FAMILY);
+ doMROnTableTest(util, FAMILY, data, args, 1);
+ util.deleteTable(tableName);
+ }
+
+ protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
+ String[] args, int valueMultiplier) throws Exception {
+ TableName table = TableName.valueOf(args[args.length - 1]);
+ Configuration conf = new Configuration(util.getConfiguration());
+
+ // populate input file
+ FileSystem fs = FileSystem.get(conf);
+ Path inputPath = fs.makeQualified(new Path(util
+ .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
+ FSDataOutputStream op = fs.create(inputPath, true);
+ op.write(Bytes.toBytes(data));
+ op.close();
+ LOG.debug(String.format("Wrote test data to file: %s", inputPath));
+
+ if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
+ LOG.debug("Forcing combiner.");
+ conf.setInt("mapreduce.map.combine.minspills", 1);
+ }
+
+ // run the import
+ List<String> argv = new ArrayList<String>(Arrays.asList(args));
+ argv.add(inputPath.toString());
+ Tool tool = new ImportTsv();
+ LOG.debug("Running ImportTsv with arguments: " + argv);
+ try {
+ // Job will fail if observer rejects entries without TTL
+ assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
+ } finally {
+ // Clean up
+ if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
+ LOG.debug("Deleting test subdirectory");
+ util.cleanupDataTestDirOnTestFS(table.getNameAsString());
+ }
+ }
+
+ return tool;
+ }
+
+ public static class TTLCheckingObserver extends BaseRegionObserver {
+
+ @Override
+ public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
+ Durability durability) throws IOException {
+ HRegion region = e.getEnvironment().getRegion();
+ if (!region.getRegionInfo().isMetaTable()
+ && !region.getRegionInfo().getTable().isSystemTable()) {
+ // The put carries the TTL attribute
+ if (put.getTTL() != Long.MAX_VALUE) {
+ return;
+ }
+ throw new IOException("Operation does not have TTL set");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 2f3e59a..4d74cd8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -78,6 +78,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -91,6 +92,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
@@ -112,6 +114,7 @@ import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -139,6 +142,7 @@ import org.apache.hadoop.hbase.wal.WALSplitter;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
@@ -5766,6 +5770,133 @@ public class TestHRegion {
}
}
+ @Test
+ public void testCellTTLs() throws IOException {
+ IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ final byte[] row = Bytes.toBytes("testRow");
+ final byte[] q1 = Bytes.toBytes("q1");
+ final byte[] q2 = Bytes.toBytes("q2");
+ final byte[] q3 = Bytes.toBytes("q3");
+ final byte[] q4 = Bytes.toBytes("q4");
+
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testCellTTLs"));
+ HColumnDescriptor hcd = new HColumnDescriptor(fam1);
+ hcd.setTimeToLive(10); // 10 seconds
+ htd.addFamily(hcd);
+
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
+
+ HRegion region = HRegion.createHRegion(new HRegionInfo(htd.getTableName(),
+ HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
+ TEST_UTIL.getDataTestDir(), conf, htd);
+ assertNotNull(region);
+ try {
+ long now = EnvironmentEdgeManager.currentTime();
+ // Add a cell that will expire in 5 seconds via cell TTL
+ region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
+ HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+ // TTL tags specify ts in milliseconds
+ new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+ // Add a cell that will expire after 10 seconds via family setting
+ region.put(new Put(row).add(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
+ // Add a cell that will expire in 15 seconds via cell TTL
+ region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
+ HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+ // TTL tags specify ts in milliseconds
+ new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+ // Add a cell that will expire in 20 seconds via family setting
+ region.put(new Put(row).add(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
+
+ // Flush so we are sure store scanning gets this right
+ region.flushcache();
+
+ // A query at time T+0 should return all cells
+ Result r = region.get(new Get(row));
+ assertNotNull(r.getValue(fam1, q1));
+ assertNotNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+5 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNotNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+10 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+15 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+20 seconds
+ edge.incrementTime(10000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNull(r.getValue(fam1, q3));
+ assertNull(r.getValue(fam1, q4));
+
+ // Fun with disappearing increments
+
+ // Start at 1
+ region.put(new Put(row).add(fam1, q1, Bytes.toBytes(1L)));
+ r = region.get(new Get(row));
+ byte[] val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 1L);
+
+ // Increment with a TTL of 5 seconds
+ Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
+ incr.setTTL(5000);
+ region.increment(incr); // 2
+
+ // New value should be 2
+ r = region.get(new Get(row));
+ val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 2L);
+
+ // Increment time to T+25 seconds
+ edge.incrementTime(5000);
+
+ // Value should be back to 1
+ r = region.get(new Get(row));
+ val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 1L);
+
+ // Increment time to T+30 seconds
+ edge.incrementTime(5000);
+
+ // Original value written at T+20 should be gone now via family TTL
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+
+ } finally {
+ HRegion.closeHRegion(region);
+ }
+ }
+
private static HRegion initHRegion(byte[] tableName, String callingMethod,
byte[]... families) throws IOException {
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
index 35b71e7..d1270da 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
@@ -91,11 +91,12 @@ public class TestQueryMatcher extends HBaseTestCase {
}
- private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
- // 2,4,5
+ private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
+ long now = EnvironmentEdgeManager.currentTime();
+ // 2,4,5
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2),
- EnvironmentEdgeManager.currentTime() - ttl);
+ now - ttl, now);
List<KeyValue> memstore = new ArrayList<KeyValue>();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -175,9 +176,10 @@ public class TestQueryMatcher extends HBaseTestCase {
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.DONE);
+ long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
- EnvironmentEdgeManager.currentTime() - ttl);
+ now - ttl, now);
List<KeyValue> memstore = new ArrayList<KeyValue>();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -230,8 +232,8 @@ public class TestQueryMatcher extends HBaseTestCase {
long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm =
- new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
- rowComparator), get.getFamilyMap().get(fam2), now - testTTL);
+ new ScanQueryMatcher(scan, new ScanInfo(fam2, 0, 1, testTTL, KeepDeletedCells.FALSE, 0,
+ rowComparator), get.getFamilyMap().get(fam2), now - testTTL, now);
KeyValue [] kvs = new KeyValue[] {
new KeyValue(row1, fam2, col1, now-100, data),
@@ -285,7 +287,7 @@ public class TestQueryMatcher extends HBaseTestCase {
long now = EnvironmentEdgeManager.currentTime();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
- now - testTTL);
+ now - testTTL, now);
KeyValue [] kvs = new KeyValue[] {
new KeyValue(row1, fam2, col1, now-100, data),
@@ -343,7 +345,7 @@ public class TestQueryMatcher extends HBaseTestCase {
NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
- HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to, null);
+ HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, now, from, to, null);
List<ScanQueryMatcher.MatchCode> actual =
new ArrayList<ScanQueryMatcher.MatchCode>(rows.length);
byte[] prevRow = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
index 14bd9d6..bad9084 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
@@ -418,8 +418,13 @@ public class TestTags {
tags = TestCoprocessorForTags.tags;
assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(2, tags.size());
- assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
- assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+ // We cannot assume the ordering of tags
+ List<String> tagValues = new ArrayList<String>();
+ for (Tag tag: tags) {
+ tagValues.add(Bytes.toString(tag.getValue()));
+ }
+ assertTrue(tagValues.contains("tag1"));
+ assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
@@ -475,8 +480,13 @@ public class TestTags {
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = TestCoprocessorForTags.tags;
assertEquals(2, tags.size());
- assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
- assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+ // We cannot assume the ordering of tags
+ tagValues.clear();
+ for (Tag tag: tags) {
+ tagValues.add(Bytes.toString(tag.getValue()));
+ }
+ assertTrue(tagValues.contains("tag1"));
+ assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-shell/src/main/ruby/hbase/table.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/table.rb b/hbase-shell/src/main/ruby/hbase/table.rb
index 85da4ed..1ec1cc4 100644
--- a/hbase-shell/src/main/ruby/hbase/table.rb
+++ b/hbase-shell/src/main/ruby/hbase/table.rb
@@ -141,17 +141,19 @@ EOF
set_attributes(p, attributes) if attributes
visibility = args[VISIBILITY]
set_cell_visibility(p, visibility) if visibility
+ ttl = args[TTL]
+ set_op_ttl(p, ttl) if ttl
end
#Case where attributes are specified without timestamp
if timestamp.kind_of?(Hash)
timestamp.each do |k, v|
- if v.kind_of?(Hash)
- set_attributes(p, v) if v
- end
- if v.kind_of?(String)
- set_cell_visibility(p, v) if v
- end
-
+ if k == 'ATTRIBUTES'
+ set_attributes(p, v)
+ elsif k == 'VISIBILITY'
+ set_cell_visibility(p, v)
+ elsif k == "TTL"
+ set_op_ttl(p, v)
+ end
end
timestamp = nil
end
@@ -219,6 +221,8 @@ EOF
visibility = args[VISIBILITY]
set_attributes(incr, attributes) if attributes
set_cell_visibility(incr, visibility) if visibility
+ ttl = args[TTL]
+ set_op_ttl(incr, ttl) if ttl
end
incr.addColumn(family, qualifier, value)
@table.increment(incr)
@@ -237,6 +241,8 @@ EOF
visibility = args[VISIBILITY]
set_attributes(append, attributes) if attributes
set_cell_visibility(append, visibility) if visibility
+ ttl = args[TTL]
+ set_op_ttl(append, ttl) if ttl
end
append.add(family, qualifier, value.to_s.to_java_bytes)
@table.append(append)
@@ -545,6 +551,10 @@ EOF
auths.to_java(:string)))
end
+ def set_op_ttl(op, ttl)
+ op.setTTL(ttl.to_java(:long))
+ end
+
#----------------------------
# Add general administration utilities to the shell
# each of the names below adds this method name to the table
http://git-wip-us.apache.org/repos/asf/hbase/blob/004e977b/hbase-shell/src/test/ruby/hbase/table_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/table_test.rb b/hbase-shell/src/test/ruby/hbase/table_test.rb
index 7272229..fa2990d 100644
--- a/hbase-shell/src/test/ruby/hbase/table_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/table_test.rb
@@ -530,5 +530,18 @@ module Hbase
end
end
+ define_test "mutation with TTL should expire" do
+ @test_table.put('ttlTest', 'x:a', 'foo', { TTL => 1000 } )
+ begin
+ res = @test_table._get_internal('ttlTest', 'x:a')
+ assert_not_nil(res)
+ sleep 2
+ res = @test_table._get_internal('ttlTest', 'x:a')
+ assert_nil(res)
+ ensure
+ @test_table.delete('ttlTest', 'x:a')
+ end
+ end
+
end
end
[3/3] hbase git commit: HBASE-10560 Per cell TTLs
Posted by ap...@apache.org.
HBASE-10560 Per cell TTLs
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/869c5665
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/869c5665
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/869c5665
Branch: refs/heads/0.98
Commit: 869c5665cd4cc0cd7994933e839154002f682f44
Parents: 45c8be3
Author: Andrew Purtell <ap...@apache.org>
Authored: Fri Dec 5 11:00:45 2014 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Fri Dec 5 11:53:59 2014 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/Delete.java | 5 +
.../apache/hadoop/hbase/client/Mutation.java | 34 +++
.../java/org/apache/hadoop/hbase/TagType.java | 1 +
.../hadoop/hbase/mapreduce/CellCreator.java | 35 ++-
.../hadoop/hbase/mapreduce/ImportTsv.java | 55 +++-
.../hadoop/hbase/mapreduce/TextSortReducer.java | 24 +-
.../hbase/mapreduce/TsvImporterMapper.java | 26 +-
.../GetClosestRowBeforeTracker.java | 26 +-
.../hadoop/hbase/regionserver/HRegion.java | 255 ++++++++++++++-----
.../hadoop/hbase/regionserver/HStore.java | 37 ++-
.../hbase/regionserver/ScanQueryMatcher.java | 30 ++-
.../hadoop/hbase/regionserver/StoreScanner.java | 12 +-
.../hbase/mapreduce/TestImportTSVWithTTLs.java | 171 +++++++++++++
.../hadoop/hbase/regionserver/TestHRegion.java | 131 ++++++++++
.../hbase/regionserver/TestQueryMatcher.java | 16 +-
.../hadoop/hbase/regionserver/TestTags.java | 18 +-
hbase-shell/src/main/ruby/hbase/table.rb | 24 +-
hbase-shell/src/test/ruby/hbase/table_test.rb | 13 +
18 files changed, 794 insertions(+), 119 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
index f1c954f..8a47e44 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Delete.java
@@ -323,4 +323,9 @@ public class Delete extends Mutation implements Comparable<Row> {
map.put("ts", this.ts);
return map;
}
+
+ @Override
+ public Delete setTTL(long ttl) {
+ throw new UnsupportedOperationException("Setting TTLs on Deletes is not supported");
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
index 869d940..ec41568 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Mutation.java
@@ -78,6 +78,11 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
*/
private static final String CONSUMED_CLUSTER_IDS = "_cs.id";
+ /**
+ * The attribute for storing TTL for the result of the mutation.
+ */
+ private static final String OP_ATTRIBUTE_TTL = "_ttl";
+
protected byte [] row = null;
protected long ts = HConstants.LATEST_TIMESTAMP;
protected Durability durability = Durability.USE_DEFAULT;
@@ -206,6 +211,12 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
if (getId() != null) {
map.put("id", getId());
}
+ // Add the TTL if set
+ // Long.MAX_VALUE is the default, and is interpreted to mean this attribute
+ // has not been set.
+ if (getTTL() != Long.MAX_VALUE) {
+ map.put("ttl", getTTL());
+ }
return map;
}
@@ -470,6 +481,29 @@ public abstract class Mutation extends OperationWithAttributes implements Row, C
}
/**
+ * Return the TTL requested for the result of the mutation, in milliseconds.
+ * @return the TTL requested for the result of the mutation, in milliseconds,
+ * or Long.MAX_VALUE if unset
+ */
+ public long getTTL() {
+ byte[] ttlBytes = getAttribute(OP_ATTRIBUTE_TTL);
+ if (ttlBytes != null) {
+ return Bytes.toLong(ttlBytes);
+ }
+ return Long.MAX_VALUE;
+ }
+
+ /**
+ * Set the TTL desired for the result of the mutation, in milliseconds.
+ * @param ttl the TTL desired for the result of the mutation, in milliseconds
+ * @return this
+ */
+ public Mutation setTTL(long ttl) {
+ setAttribute(OP_ATTRIBUTE_TTL, Bytes.toBytes(ttl));
+ return this;
+ }
+
+ /**
* Subclasses should override this method to add the heap size of their own fields.
* @return the heap size to add (will be aligned).
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
index b113516..65b2c91 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/TagType.java
@@ -30,4 +30,5 @@ public final class TagType {
public static final byte VISIBILITY_EXP_SERIALIZATION_FORMAT_TAG_TYPE = (byte)4;
// String based tag type used in replication
public static final byte STRING_VIS_TAG_TYPE = (byte) 7;
+ public static final byte TTL_TAG_TYPE = (byte)8;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
index b3dfee7..001f64d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/CellCreator.java
@@ -69,7 +69,7 @@ public class CellCreator {
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
int vlength) throws IOException {
return create(row, roffset, rlength, family, foffset, flength, qualifier, qoffset, qlength,
- timestamp, value, voffset, vlength, null);
+ timestamp, value, voffset, vlength, (List<Tag>)null);
}
/**
@@ -90,6 +90,7 @@ public class CellCreator {
* @return created Cell
* @throws IOException
*/
+ @Deprecated
public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
int vlength, String visExpression) throws IOException {
@@ -100,4 +101,36 @@ public class CellCreator {
return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, visTags);
}
+
+ /**
+ * @param row row key
+ * @param roffset row offset
+ * @param rlength row length
+ * @param family family name
+ * @param foffset family offset
+ * @param flength family length
+ * @param qualifier column qualifier
+ * @param qoffset qualifier offset
+ * @param qlength qualifier length
+ * @param timestamp version timestamp
+ * @param value column value
+ * @param voffset value offset
+ * @param vlength value length
+ * @param tags
+ * @return created Cell
+ * @throws IOException
+ */
+ public Cell create(byte[] row, int roffset, int rlength, byte[] family, int foffset, int flength,
+ byte[] qualifier, int qoffset, int qlength, long timestamp, byte[] value, int voffset,
+ int vlength, List<Tag> tags) throws IOException {
+ return new KeyValue(row, roffset, rlength, family, foffset, flength, qualifier, qoffset,
+ qlength, timestamp, KeyValue.Type.Put, value, voffset, vlength, tags);
+ }
+
+ /**
+ * @return Visibility expression resolver
+ */
+ public VisibilityExpressionResolver getVisibilityExpressionResolver() {
+ return this.visExpResolver;
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
index a1f84bb..6c154f5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ImportTsv.java
@@ -121,13 +121,20 @@ public class ImportTsv extends Configured implements Tool {
public static final String CELL_VISIBILITY_COLUMN_SPEC = "HBASE_CELL_VISIBILITY";
+ public static final String CELL_TTL_COLUMN_SPEC = "HBASE_CELL_TTL";
+
private int attrKeyColumnIndex = DEFAULT_ATTRIBUTES_COLUMN_INDEX;
public static final int DEFAULT_ATTRIBUTES_COLUMN_INDEX = -1;
public static final int DEFAULT_CELL_VISIBILITY_COLUMN_INDEX = -1;
+ public static final int DEFAULT_CELL_TTL_COLUMN_INDEX = -1;
+
private int cellVisibilityColumnIndex = DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+
+ private int cellTTLColumnIndex = DEFAULT_CELL_TTL_COLUMN_INDEX;
+
/**
* @param columnsSpecification the list of columns to parser out, comma separated.
* The row key should be the special token TsvParser.ROWKEY_COLUMN_SPEC
@@ -158,14 +165,18 @@ public class ImportTsv extends Configured implements Tool {
timestampKeyColumnIndex = i;
continue;
}
- if(ATTRIBUTES_COLUMN_SPEC.equals(str)) {
+ if (ATTRIBUTES_COLUMN_SPEC.equals(str)) {
attrKeyColumnIndex = i;
continue;
}
- if(CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
+ if (CELL_VISIBILITY_COLUMN_SPEC.equals(str)) {
cellVisibilityColumnIndex = i;
continue;
}
+ if (CELL_TTL_COLUMN_SPEC.equals(str)) {
+ cellTTLColumnIndex = i;
+ continue;
+ }
String[] parts = str.split(":", 2);
if (parts.length == 1) {
families[i] = str.getBytes();
@@ -193,6 +204,10 @@ public class ImportTsv extends Configured implements Tool {
return cellVisibilityColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
}
+ public boolean hasCellTTL() {
+ return cellTTLColumnIndex != DEFAULT_CELL_VISIBILITY_COLUMN_INDEX;
+ }
+
public int getAttributesKeyColumnIndex() {
return attrKeyColumnIndex;
}
@@ -200,9 +215,15 @@ public class ImportTsv extends Configured implements Tool {
public int getCellVisibilityColumnIndex() {
return cellVisibilityColumnIndex;
}
+
+ public int getCellTTLColumnIndex() {
+ return cellTTLColumnIndex;
+ }
+
public int getRowKeyColumnIndex() {
return rowKeyColumnIndex;
}
+
public byte[] getFamily(int idx) {
return families[idx];
}
@@ -234,8 +255,10 @@ public class ImportTsv extends Configured implements Tool {
throw new BadTsvLineException("No timestamp");
} else if (hasAttributes() && tabOffsets.size() <= getAttributesKeyColumnIndex()) {
throw new BadTsvLineException("No attributes specified");
- } else if(hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
+ } else if (hasCellVisibility() && tabOffsets.size() <= getCellVisibilityColumnIndex()) {
throw new BadTsvLineException("No cell visibility specified");
+ } else if (hasCellTTL() && tabOffsets.size() <= getCellTTLColumnIndex()) {
+ throw new BadTsvLineException("No cell TTL specified");
}
return new ParsedLine(tabOffsets, lineBytes);
}
@@ -332,6 +355,31 @@ public class ImportTsv extends Configured implements Tool {
}
}
+ public int getCellTTLColumnOffset() {
+ if (hasCellTTL()) {
+ return getColumnOffset(cellTTLColumnIndex);
+ } else {
+ return DEFAULT_CELL_TTL_COLUMN_INDEX;
+ }
+ }
+
+ public int getCellTTLColumnLength() {
+ if (hasCellTTL()) {
+ return getColumnLength(cellTTLColumnIndex);
+ } else {
+ return DEFAULT_CELL_TTL_COLUMN_INDEX;
+ }
+ }
+
+ public long getCellTTL() {
+ if (!hasCellTTL()) {
+ return 0;
+ } else {
+ return Bytes.toLong(lineBytes, getColumnOffset(cellTTLColumnIndex),
+ getColumnLength(cellTTLColumnIndex));
+ }
+ }
+
public int getColumnOffset(int idx) {
if (idx > 0)
return tabOffsets.get(idx - 1) + 1;
@@ -489,6 +537,7 @@ public class ImportTsv extends Configured implements Tool {
if (TsvParser.ROWKEY_COLUMN_SPEC.equals(aColumn)
|| TsvParser.TIMESTAMPKEY_COLUMN_SPEC.equals(aColumn)
|| TsvParser.CELL_VISIBILITY_COLUMN_SPEC.equals(aColumn)
+ || TsvParser.CELL_TTL_COLUMN_SPEC.equals(aColumn)
|| TsvParser.ATTRIBUTES_COLUMN_SPEC.equals(aColumn))
continue;
// we are only concerned with the first one (in case this is a cf:cq)
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
index 4a0e0fd..b3981a1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TextSortReducer.java
@@ -18,7 +18,9 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Iterator;
+import java.util.List;
import java.util.Set;
import java.util.TreeSet;
@@ -28,8 +30,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueUtil;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.hadoop.mapreduce.Reducer;
@@ -62,6 +67,9 @@ public class TextSortReducer extends
/** Cell visibility expr **/
private String cellVisibilityExpr;
+ /** Cell TTL */
+ private long ttl;
+
private CellCreator kvCreator;
public long getTs() {
@@ -148,18 +156,30 @@ public class TextSortReducer extends
// Retrieve timestamp if exists
ts = parsed.getTimestamp(ts);
cellVisibilityExpr = parsed.getCellVisibility();
+ ttl = parsed.getCellTTL();
for (int i = 0; i < parsed.getColumnCount(); i++) {
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
- || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
+ || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
+ || i == parser.getCellTTLColumnIndex()) {
continue;
}
// Creating the KV which needs to be directly written to HFiles. Using the Facade
// KVCreator for creation of kvs.
+ List<Tag> tags = new ArrayList<Tag>();
+ if (cellVisibilityExpr != null) {
+ tags.addAll(kvCreator.getVisibilityExpressionResolver()
+ .createVisibilityExpTags(cellVisibilityExpr));
+ }
+ // Add TTL directly to the KV so we can vary them when packing more than one KV
+ // into puts
+ if (ttl > 0) {
+ tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
+ }
Cell cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(),
parsed.getRowKeyLength(), parser.getFamily(i), 0, parser.getFamily(i).length,
parser.getQualifier(i), 0, parser.getQualifier(i).length, ts, lineBytes,
- parsed.getColumnOffset(i), parsed.getColumnLength(i), cellVisibilityExpr);
+ parsed.getColumnOffset(i), parsed.getColumnLength(i), tags);
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
kvs.add(kv);
curSize += kv.heapSize();
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
index ff84081..270de75 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/TsvImporterMapper.java
@@ -18,17 +18,22 @@
package org.apache.hadoop.hbase.mapreduce;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.ImportTsv.TsvParser.BadTsvLineException;
import org.apache.hadoop.hbase.security.visibility.CellVisibility;
import org.apache.hadoop.hbase.util.Base64;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Counter;
@@ -59,6 +64,8 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
protected String cellVisibilityExpr;
+ protected long ttl;
+
protected CellCreator kvCreator;
private String hfileOutPath;
@@ -144,11 +151,13 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
// Retrieve timestamp if exists
ts = parsed.getTimestamp(ts);
cellVisibilityExpr = parsed.getCellVisibility();
+ ttl = parsed.getCellTTL();
Put put = new Put(rowKey.copyBytes());
for (int i = 0; i < parsed.getColumnCount(); i++) {
if (i == parser.getRowKeyColumnIndex() || i == parser.getTimestampKeyColumnIndex()
- || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()) {
+ || i == parser.getAttributesKeyColumnIndex() || i == parser.getCellVisibilityColumnIndex()
+ || i == parser.getCellTTLColumnIndex()) {
continue;
}
populatePut(lineBytes, parsed, put, i);
@@ -192,13 +201,26 @@ extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put>
// the validation
put.setCellVisibility(new CellVisibility(cellVisibilityExpr));
}
+ if (ttl > 0) {
+ put.setTTL(ttl);
+ }
} else {
// Creating the KV which needs to be directly written to HFiles. Using the Facade
// KVCreator for creation of kvs.
+ List<Tag> tags = new ArrayList<Tag>();
+ if (cellVisibilityExpr != null) {
+ tags.addAll(kvCreator.getVisibilityExpressionResolver()
+ .createVisibilityExpTags(cellVisibilityExpr));
+ }
+ // Add TTL directly to the KV so we can vary them when packing more than one KV
+ // into puts
+ if (ttl > 0) {
+ tags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(ttl)));
+ }
cell = this.kvCreator.create(lineBytes, parsed.getRowKeyOffset(), parsed.getRowKeyLength(),
parser.getFamily(i), 0, parser.getFamily(i).length, parser.getQualifier(i), 0,
parser.getQualifier(i).length, ts, lineBytes, parsed.getColumnOffset(i),
- parsed.getColumnLength(i), cellVisibilityExpr);
+ parsed.getColumnLength(i), tags);
}
put.add(cell);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
index d37aa5a..876aae0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/GetClosestRowBeforeTracker.java
@@ -24,6 +24,7 @@ import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
@@ -31,7 +32,7 @@ import org.apache.hadoop.hbase.util.Bytes;
/**
* State and utility processing {@link HRegion#getClosestRowBefore(byte[], byte[])}.
- * Like {@link ScanDeleteTracker} and {@link ScanDeleteTracker} but does not
+ * Like {@link ScanQueryMatcher} and {@link ScanDeleteTracker} but does not
* implement the {@link DeleteTracker} interface since state spans rows (There
* is no update nor reset method).
*/
@@ -39,7 +40,8 @@ import org.apache.hadoop.hbase.util.Bytes;
class GetClosestRowBeforeTracker {
private final KeyValue targetkey;
// Any cell w/ a ts older than this is expired.
- private final long oldestts;
+ private final long now;
+ private final long oldestUnexpiredTs;
private KeyValue candidate = null;
private final KVComparator kvcomparator;
// Flag for whether we're doing getclosest on a metaregion.
@@ -72,20 +74,13 @@ class GetClosestRowBeforeTracker {
HConstants.DELIMITER) - this.rowoffset;
}
this.tablenamePlusDelimiterLength = metaregion? l + 1: -1;
- this.oldestts = System.currentTimeMillis() - ttl;
+ this.now = System.currentTimeMillis();
+ this.oldestUnexpiredTs = now - ttl;
this.kvcomparator = c;
KeyValue.RowOnlyComparator rc = new KeyValue.RowOnlyComparator(this.kvcomparator);
this.deletes = new TreeMap<KeyValue, NavigableSet<KeyValue>>(rc);
}
- /**
- * @param kv
- * @return True if this <code>kv</code> is expired.
- */
- boolean isExpired(final KeyValue kv) {
- return HStore.isExpired(kv, this.oldestts);
- }
-
/*
* Add the specified KeyValue to the list of deletes.
* @param kv
@@ -170,6 +165,15 @@ class GetClosestRowBeforeTracker {
return false;
}
+ /**
+ * @param cell
+ * @return true if the cell is expired
+ */
+ public boolean isExpired(final Cell cell) {
+ return cell.getTimestamp() < this.oldestUnexpiredTs ||
+ HStore.isCellTTLExpired(cell, this.oldestUnexpiredTs, this.now);
+ }
+
/*
* Handle keys whose values hold deletes.
* Add to the set of deletes and then if the candidate keys contain any that
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 1d554fc..777dcfe 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -30,6 +30,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
@@ -84,6 +85,8 @@ import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RegionTooBusyException;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.backup.HFileArchiver;
import org.apache.hadoop.hbase.client.Append;
@@ -2465,6 +2468,7 @@ public class HRegion implements HeapSize { // , Writable{
}
noOfDeletes++;
}
+ rewriteCellTags(familyMaps[i], mutation);
}
lock(this.updatesLock.readLock(), numReadyToWrite);
@@ -2877,6 +2881,7 @@ public class HRegion implements HeapSize { // , Writable{
closeRegionOperation();
}
}
+
private void doBatchMutate(Mutation mutation) throws IOException, DoNotRetryIOException {
// Currently this is only called for puts and deletes, so no nonces.
OperationStatus[] batchMutate = this.batchMutate(new Mutation[] { mutation },
@@ -2928,6 +2933,59 @@ public class HRegion implements HeapSize { // , Writable{
}
}
+ /**
+ * Possibly rewrite incoming cell tags.
+ */
+ void rewriteCellTags(Map<byte[], List<Cell>> familyMap, final Mutation m) {
+ // Check if we have any work to do and early out otherwise
+ // Update these checks as more logic is added here
+
+ if (m.getTTL() == Long.MAX_VALUE) {
+ return;
+ }
+
+ // From this point we know we have some work to do
+
+ for (Map.Entry<byte[], List<Cell>> e: familyMap.entrySet()) {
+ List<Cell> cells = e.getValue();
+ assert cells instanceof RandomAccess;
+ int listSize = cells.size();
+ for (int i = 0; i < listSize; i++) {
+ Cell cell = cells.get(i);
+ List<Tag> newTags = new ArrayList<Tag>();
+ Iterator<Tag> tagIterator = CellUtil.tagsIterator(cell.getTagsArray(),
+ cell.getTagsOffset(), cell.getTagsLengthUnsigned());
+
+ // Carry forward existing tags
+
+ while (tagIterator.hasNext()) {
+
+ // Add any filters or tag specific rewrites here
+
+ newTags.add(tagIterator.next());
+ }
+
+ // Cell TTL handling
+
+ // Check again if we need to add a cell TTL because early out logic
+ // above may change when there are more tag based features in core.
+ if (m.getTTL() != Long.MAX_VALUE) {
+ // Add a cell TTL tag
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(m.getTTL())));
+ }
+
+ // Rewrite the cell with the updated set of tags
+
+ cells.set(i, new KeyValue(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength(),
+ cell.getFamilyArray(), cell.getFamilyOffset(), cell.getFamilyLength(),
+ cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength(),
+ cell.getTimestamp(), KeyValue.Type.codeToType(cell.getTypeByte()),
+ cell.getValueArray(), cell.getValueOffset(), cell.getValueLength(),
+ newTags));
+ }
+ }
+ }
+
/*
* Check if resources to support an update.
*
@@ -5047,6 +5105,9 @@ public class HRegion implements HeapSize { // , Writable{
processor.preBatchMutate(this, walEdit);
// 7. Apply to memstore
for (Mutation m : mutations) {
+ // Handle any tag based cell features
+ rewriteCellTags(m.getFamilyCellMap(), m);
+
for (CellScanner cellScanner = m.cellScanner(); cellScanner.advance();) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cellScanner.current());
kv.setMvccVersion(writeEntry.getWriteNumber());
@@ -5173,8 +5234,9 @@ public class HRegion implements HeapSize { // , Writable{
return append(append, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
- // TODO: There's a lot of boiler plate code identical
- // to increment... See how to better unify that.
+ // TODO: There's a lot of boiler plate code identical to increment.
+ // We should refactor append and increment as local get-mutate-put
+ // transactions, so all stores only go through one code path for puts.
/**
* Perform one or more append operations on a row.
*
@@ -5242,67 +5304,111 @@ public class HRegion implements HeapSize { // , Writable{
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the append value
- // Avoid as much copying as possible. Every byte is copied at most
- // once.
+ // Avoid as much copying as possible. We may need to rewrite and
+ // consolidate tags. Bytes are only copied once.
// Would be nice if KeyValue had scatter/gather logic
int idx = 0;
for (Cell cell : family.getValue()) {
KeyValue kv = KeyValueUtil.ensureKeyValue(cell);
- KeyValue newKV;
+ KeyValue newKv;
KeyValue oldKv = null;
if (idx < results.size()
- && CellUtil.matchingQualifier(results.get(idx),kv)) {
+ && CellUtil.matchingQualifier(results.get(idx), kv)) {
oldKv = KeyValueUtil.ensureKeyValue(results.get(idx));
- // allocate an empty kv once
long ts = Math.max(now, oldKv.getTimestamp());
- newKV = new KeyValue(row.length, kv.getFamilyLength(),
+
+ // Process cell tags
+ List<Tag> newTags = new ArrayList<Tag>();
+
+ // Make a union of the set of tags in the old and new KVs
+
+ if (oldKv.getTagsLengthUnsigned() > 0) {
+ Iterator<Tag> i = CellUtil.tagsIterator(oldKv.getTagsArray(),
+ oldKv.getTagsOffset(), oldKv.getTagsLengthUnsigned());
+ while (i.hasNext()) {
+ newTags.add(i.next());
+ }
+ }
+ if (kv.getTagsLengthUnsigned() > 0) {
+ Iterator<Tag> i = CellUtil.tagsIterator(kv.getTagsArray(), kv.getTagsOffset(),
+ kv.getTagsLengthUnsigned());
+ while (i.hasNext()) {
+ newTags.add(i.next());
+ }
+ }
+
+ // Cell TTL handling
+
+ if (append.getTTL() != Long.MAX_VALUE) {
+ // Add the new TTL tag
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+ }
+
+ // Rebuild tags
+ byte[] tagBytes = Tag.fromList(newTags);
+
+ // allocate an empty cell once
+ newKv = new KeyValue(row.length, kv.getFamilyLength(),
kv.getQualifierLength(), ts, KeyValue.Type.Put,
oldKv.getValueLength() + kv.getValueLength(),
- oldKv.getTagsLengthUnsigned() + kv.getTagsLengthUnsigned());
- // copy in the value
- System.arraycopy(oldKv.getBuffer(), oldKv.getValueOffset(),
- newKV.getBuffer(), newKV.getValueOffset(),
- oldKv.getValueLength());
- System.arraycopy(kv.getBuffer(), kv.getValueOffset(),
- newKV.getBuffer(),
- newKV.getValueOffset() + oldKv.getValueLength(),
- kv.getValueLength());
- // copy in the tags
- System.arraycopy(oldKv.getBuffer(), oldKv.getTagsOffset(), newKV.getBuffer(),
- newKV.getTagsOffset(), oldKv.getTagsLengthUnsigned());
- System.arraycopy(kv.getBuffer(), kv.getTagsOffset(), newKV.getBuffer(),
- newKV.getTagsOffset() + oldKv.getTagsLengthUnsigned(),
- kv.getTagsLengthUnsigned());
+ tagBytes.length);
// copy in row, family, and qualifier
- System.arraycopy(kv.getBuffer(), kv.getRowOffset(),
- newKV.getBuffer(), newKV.getRowOffset(), kv.getRowLength());
- System.arraycopy(kv.getBuffer(), kv.getFamilyOffset(),
- newKV.getBuffer(), newKV.getFamilyOffset(),
- kv.getFamilyLength());
- System.arraycopy(kv.getBuffer(), kv.getQualifierOffset(),
- newKV.getBuffer(), newKV.getQualifierOffset(),
- kv.getQualifierLength());
+ System.arraycopy(kv.getRowArray(), kv.getRowOffset(),
+ newKv.getRowArray(), newKv.getRowOffset(), kv.getRowLength());
+ System.arraycopy(kv.getFamilyArray(), kv.getFamilyOffset(),
+ newKv.getFamilyArray(), newKv.getFamilyOffset(),
+ kv.getFamilyLength());
+ System.arraycopy(kv.getQualifierArray(), kv.getQualifierOffset(),
+ newKv.getQualifierArray(), newKv.getQualifierOffset(),
+ kv.getQualifierLength());
+ // copy in the value
+ System.arraycopy(oldKv.getValueArray(), oldKv.getValueOffset(),
+ newKv.getValueArray(), newKv.getValueOffset(),
+ oldKv.getValueLength());
+ System.arraycopy(kv.getValueArray(), kv.getValueOffset(),
+ newKv.getValueArray(),
+ newKv.getValueOffset() + oldKv.getValueLength(),
+ kv.getValueLength());
+ // Copy in tag data
+ System.arraycopy(tagBytes, 0, newKv.getTagsArray(), newKv.getTagsOffset(),
+ tagBytes.length);
idx++;
} else {
- newKV = kv;
// Append's KeyValue.Type==Put and ts==HConstants.LATEST_TIMESTAMP,
// so only need to update the timestamp to 'now'
- newKV.updateLatestStamp(Bytes.toBytes(now));
+ kv.updateLatestStamp(Bytes.toBytes(now));
+
+ // Cell TTL handling
+
+ if (append.getTTL() != Long.MAX_VALUE) {
+ List<Tag> newTags = new ArrayList<Tag>(1);
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(append.getTTL())));
+ // Add the new TTL tag
+ newKv = new KeyValue(kv.getRowArray(), kv.getRowOffset(), kv.getRowLength(),
+ kv.getFamilyArray(), kv.getFamilyOffset(), kv.getFamilyLength(),
+ kv.getQualifierArray(), kv.getQualifierOffset(), kv.getQualifierLength(),
+ kv.getTimestamp(), KeyValue.Type.codeToType(kv.getTypeByte()),
+ kv.getValueArray(), kv.getValueOffset(), kv.getValueLength(),
+ newTags);
+ } else {
+ newKv = kv;
+ }
}
- newKV.setMvccVersion(w.getWriteNumber());
+ newKv.setMvccVersion(w.getWriteNumber());
+
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
- newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
- RegionObserver.MutationType.APPEND, append, oldKv, (Cell) newKV));
+ newKv = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
+ RegionObserver.MutationType.APPEND, append, oldKv, (Cell) newKv));
}
- kvs.add(newKV);
+ kvs.add(newKv);
// Append update to WAL
if (writeToWAL) {
if (walEdits == null) {
walEdits = new WALEdit();
}
- walEdits.add(newKV);
+ walEdits.add(newKv);
}
}
@@ -5374,6 +5480,9 @@ public class HRegion implements HeapSize { // , Writable{
return increment(increment, HConstants.NO_NONCE, HConstants.NO_NONCE);
}
+ // TODO: There's a lot of boiler plate code identical to append.
+ // We should refactor append and increment as local get-mutate-put
+ // transactions, so all stores only go through one code path for puts.
/**
* Perform one or more increment operations on a row.
* @param increment
@@ -5442,13 +5551,23 @@ public class HRegion implements HeapSize { // , Writable{
// Iterate the input columns and update existing values if they were
// found, otherwise add new column initialized to the increment amount
int idx = 0;
- for (Cell kv: family.getValue()) {
- long amount = Bytes.toLong(CellUtil.cloneValue(kv));
+ for (Cell cell: family.getValue()) {
+ long amount = Bytes.toLong(CellUtil.cloneValue(cell));
boolean noWriteBack = (amount == 0);
+ List<Tag> newTags = new ArrayList<Tag>();
+
+ // Carry forward any tags that might have been added by a coprocessor
+ if (cell.getTagsLengthUnsigned() > 0) {
+ Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(),
+ cell.getTagsOffset(), cell.getTagsLengthUnsigned());
+ while (i.hasNext()) {
+ newTags.add(i.next());
+ }
+ }
Cell c = null;
long ts = now;
- if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), kv)) {
+ if (idx < results.size() && CellUtil.matchingQualifier(results.get(idx), cell)) {
c = results.get(idx);
ts = Math.max(now, c.getTimestamp());
if(c.getValueLength() == Bytes.SIZEOF_LONG) {
@@ -5458,48 +5577,52 @@ public class HRegion implements HeapSize { // , Writable{
throw new org.apache.hadoop.hbase.DoNotRetryIOException(
"Attempted to increment field that isn't 64 bits wide");
}
+ // Carry tags forward from previous version
+ if (c.getTagsLength() > 0) {
+ Iterator<Tag> i = CellUtil.tagsIterator(c.getTagsArray(),
+ c.getTagsOffset(), c.getTagsLength());
+ while (i.hasNext()) {
+ newTags.add(i.next());
+ }
+ }
idx++;
}
// Append new incremented KeyValue to list
- byte[] q = CellUtil.cloneQualifier(kv);
+ byte[] q = CellUtil.cloneQualifier(cell);
byte[] val = Bytes.toBytes(amount);
- int oldCellTagsLen = (c == null) ? 0 : c.getTagsLengthUnsigned();
- int incCellTagsLen = kv.getTagsLengthUnsigned();
- KeyValue newKV = new KeyValue(row.length, family.getKey().length, q.length, ts,
- KeyValue.Type.Put, val.length, oldCellTagsLen + incCellTagsLen);
- System.arraycopy(row, 0, newKV.getBuffer(), newKV.getRowOffset(), row.length);
- System.arraycopy(family.getKey(), 0, newKV.getBuffer(), newKV.getFamilyOffset(),
- family.getKey().length);
- System.arraycopy(q, 0, newKV.getBuffer(), newKV.getQualifierOffset(), q.length);
- // copy in the value
- System.arraycopy(val, 0, newKV.getBuffer(), newKV.getValueOffset(), val.length);
- // copy tags
- if (oldCellTagsLen > 0) {
- System.arraycopy(c.getTagsArray(), c.getTagsOffset(), newKV.getBuffer(),
- newKV.getTagsOffset(), oldCellTagsLen);
- }
- if (incCellTagsLen > 0) {
- System.arraycopy(kv.getTagsArray(), kv.getTagsOffset(), newKV.getBuffer(),
- newKV.getTagsOffset() + oldCellTagsLen, incCellTagsLen);
+
+ // Add the TTL tag if the mutation carried one
+ if (increment.getTTL() != Long.MAX_VALUE) {
+ newTags.add(new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(increment.getTTL())));
}
- newKV.setMvccVersion(w.getWriteNumber());
+
+ KeyValue newKv = new KeyValue(row, 0, row.length,
+ family.getKey(), 0, family.getKey().length,
+ q, 0, q.length,
+ ts,
+ KeyValue.Type.Put,
+ val, 0, val.length,
+ newTags);
+
+ newKv.setMvccVersion(w.getWriteNumber());
+
// Give coprocessors a chance to update the new cell
if (coprocessorHost != null) {
- newKV = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
- RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKV));
+ newKv = KeyValueUtil.ensureKeyValue(coprocessorHost.postMutationBeforeWAL(
+ RegionObserver.MutationType.INCREMENT, increment, c, (Cell) newKv));
}
- allKVs.add(newKV);
+ allKVs.add(newKv);
if (!noWriteBack) {
- kvs.add(newKV);
+ kvs.add(newKv);
// Prepare WAL updates
if (writeToWAL) {
if (walEdits == null) {
walEdits = new WALEdit();
}
- walEdits.add(newKV);
+ walEdits.add(newKv);
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 39a0677..5ea1a9c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CompoundConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
@@ -57,6 +58,8 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Tag;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.compress.Compression;
import org.apache.hadoop.hbase.io.crypto.Cipher;
@@ -1597,8 +1600,38 @@ public class HStore implements Store {
return wantedVersions > maxVersions ? maxVersions: wantedVersions;
}
- static boolean isExpired(final KeyValue key, final long oldestTimestamp) {
- return key.getTimestamp() < oldestTimestamp;
+ /**
+ * @param kv
+ * @param oldestTimestamp
+ * @return true if the cell is expired
+ */
+ static boolean isCellTTLExpired(final Cell cell, final long oldestTimestamp, final long now) {
+ // Do not create an Iterator or Tag objects unless the cell actually has
+ // tags
+ if (cell.getTagsLengthUnsigned() > 0) {
+ // Look for a TTL tag first. Use it instead of the family setting if
+ // found. If a cell has multiple TTLs, resolve the conflict by using the
+ // first tag encountered.
+ Iterator<Tag> i = CellUtil.tagsIterator(cell.getTagsArray(), cell.getTagsOffset(),
+ cell.getTagsLengthUnsigned());
+ while (i.hasNext()) {
+ Tag t = i.next();
+ if (TagType.TTL_TAG_TYPE == t.getType()) {
+ // Unlike in schema cell TTLs are stored in milliseconds, no need
+ // to convert
+ long ts = cell.getTimestamp();
+ assert t.getTagLength() == Bytes.SIZEOF_LONG;
+ long ttl = Bytes.toLong(t.getBuffer(), t.getTagOffset(), t.getTagLength());
+ if (ts + ttl < now) {
+ return true;
+ }
+ // Per cell TTLs cannot extend lifetime beyond family settings, so
+ // fall through to check that
+ break;
+ }
+ }
+ }
+ return false;
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
index 98921f4..2b56edb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
@@ -24,6 +24,7 @@ import java.util.NavigableSet;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeepDeletedCells;
import org.apache.hadoop.hbase.KeyValue;
@@ -100,6 +101,10 @@ public class ScanQueryMatcher {
private final long earliestPutTs;
private final long ttl;
+ /** The oldest timestamp we are interested in, based on TTL */
+ private final long oldestUnexpiredTS;
+ private final long now;
+
/** readPoint over which the KVs are unconditionally included */
protected long maxReadPointToTrackVersions;
@@ -152,7 +157,7 @@ public class ScanQueryMatcher {
*/
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
ScanType scanType, long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
- RegionCoprocessorHost regionCoprocessorHost) throws IOException {
+ long now, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this.tr = scan.getTimeRange();
this.rowComparator = scanInfo.getComparator();
this.regionCoprocessorHost = regionCoprocessorHost;
@@ -162,6 +167,9 @@ public class ScanQueryMatcher {
scanInfo.getFamily());
this.filter = scan.getFilter();
this.earliestPutTs = earliestPutTs;
+ this.oldestUnexpiredTS = oldestUnexpiredTS;
+ this.now = now;
+
this.maxReadPointToTrackVersions = readPointToUse;
this.timeToPurgeDeletes = scanInfo.getTimeToPurgeDeletes();
this.ttl = oldestUnexpiredTS;
@@ -216,18 +224,18 @@ public class ScanQueryMatcher {
* @param scanInfo The store's immutable scan info
* @param columns
* @param earliestPutTs Earliest put seen in any of the store files.
- * @param oldestUnexpiredTS the oldest timestamp we are interested in,
- * based on TTL
+ * @param oldestUnexpiredTS the oldest timestamp we are interested in, based on TTL
+ * @param now the current server time
* @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
* @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
* @param regionCoprocessorHost
* @throws IOException
*/
public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
- long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, byte[] dropDeletesFromRow,
+ long readPointToUse, long earliestPutTs, long oldestUnexpiredTS, long now, byte[] dropDeletesFromRow,
byte[] dropDeletesToRow, RegionCoprocessorHost regionCoprocessorHost) throws IOException {
this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
- oldestUnexpiredTS, regionCoprocessorHost);
+ oldestUnexpiredTS, now, regionCoprocessorHost);
Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
this.dropDeletesFromRow = dropDeletesFromRow;
this.dropDeletesToRow = dropDeletesToRow;
@@ -237,10 +245,10 @@ public class ScanQueryMatcher {
* Constructor for tests
*/
ScanQueryMatcher(Scan scan, ScanInfo scanInfo,
- NavigableSet<byte[]> columns, long oldestUnexpiredTS) throws IOException {
+ NavigableSet<byte[]> columns, long oldestUnexpiredTS, long now) throws IOException {
this(scan, scanInfo, columns, ScanType.USER_SCAN,
Long.MAX_VALUE, /* max Readpoint to track versions */
- HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, null);
+ HConstants.LATEST_TIMESTAMP, oldestUnexpiredTS, now, null);
}
/**
@@ -299,7 +307,6 @@ public class ScanQueryMatcher {
}
}
-
// optimize case.
if (this.stickyNextRow)
return MatchCode.SEEK_NEXT_ROW;
@@ -322,8 +329,13 @@ public class ScanQueryMatcher {
long timestamp = Bytes.toLong(bytes, initialOffset + keyLength - KeyValue.TIMESTAMP_TYPE_SIZE);
// check for early out based on timestamp alone
if (columns.isDone(timestamp)) {
- return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
+ return columns.getNextRowOrNextColumn(kv.getQualifierArray(), kv.getQualifierOffset(),
+ kv.getQualifierLength());
}
+ // check if the cell is expired by cell TTL
+ if (HStore.isCellTTLExpired(kv, this.oldestUnexpiredTS, this.now)) {
+ return MatchCode.SKIP;
+ }
/*
* The delete logic is pretty complicated now.
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index e8d5f1d..853e1bf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -76,6 +76,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
protected final Scan scan;
protected final NavigableSet<byte[]> columns;
protected final long oldestUnexpiredTS;
+ protected final long now;
protected final int minVersions;
/**
@@ -121,7 +122,8 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
explicitColumnQuery = numCol > 0;
this.scan = scan;
this.columns = columns;
- oldestUnexpiredTS = EnvironmentEdgeManager.currentTimeMillis() - ttl;
+ this.now = EnvironmentEdgeManager.currentTimeMillis();
+ this.oldestUnexpiredTS = now - ttl;
this.minVersions = minVersions;
if (store != null && ((HStore)store).getHRegion() != null
@@ -171,7 +173,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
}
matcher = new ScanQueryMatcher(scan, scanInfo, columns,
ScanType.USER_SCAN, Long.MAX_VALUE, HConstants.LATEST_TIMESTAMP,
- oldestUnexpiredTS, store.getCoprocessorHost());
+ oldestUnexpiredTS, now, store.getCoprocessorHost());
this.store.addChangedReaderObserver(this);
@@ -236,10 +238,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
((HStore)store).getHRegion().getReadpoint(IsolationLevel.READ_COMMITTED));
if (dropDeletesFromRow == null) {
matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType, smallestReadPoint,
- earliestPutTs, oldestUnexpiredTS, store.getCoprocessorHost());
+ earliestPutTs, oldestUnexpiredTS, now, store.getCoprocessorHost());
} else {
matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint, earliestPutTs,
- oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
+ oldestUnexpiredTS, now, dropDeletesFromRow, dropDeletesToRow, store.getCoprocessorHost());
}
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
@@ -279,7 +281,7 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
this(null, scan.getCacheBlocks(), scan, columns, scanInfo.getTtl(),
scanInfo.getMinVersions(), readPt);
this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
- Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, null);
+ Long.MAX_VALUE, earliestPutTs, oldestUnexpiredTS, now, null);
// In unit tests, the store could be null
if (this.store != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
new file mode 100644
index 0000000..062c05a
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/mapreduce/TestImportTSVWithTTLs.java
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.mapreduce;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Durability;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(LargeTests.class)
+public class TestImportTSVWithTTLs implements Configurable {
+
+ protected static final Log LOG = LogFactory.getLog(TestImportTSVWithTTLs.class);
+ protected static final String NAME = TestImportTsv.class.getSimpleName();
+ protected static HBaseTestingUtility util = new HBaseTestingUtility();
+
+ /**
+ * Delete the tmp directory after running doMROnTableTest. Boolean. Default is
+ * false.
+ */
+ protected static final String DELETE_AFTER_LOAD_CONF = NAME + ".deleteAfterLoad";
+
+ /**
+ * Force use of combiner in doMROnTableTest. Boolean. Default is true.
+ */
+ protected static final String FORCE_COMBINER_CONF = NAME + ".forceCombiner";
+
+ private final String FAMILY = "FAM";
+ private static Configuration conf;
+
+ @Override
+ public Configuration getConf() {
+ return util.getConfiguration();
+ }
+
+ @Override
+ public void setConf(Configuration conf) {
+ throw new IllegalArgumentException("setConf not supported");
+ }
+
+ @BeforeClass
+ public static void provisionCluster() throws Exception {
+ conf = util.getConfiguration();
+ // We don't check persistence in HFiles in this test, but if we ever do we will
+ // need this where the default hfile version is not 3 (i.e. 0.98)
+ conf.setInt("hfile.format.version", 3);
+ conf.set("hbase.coprocessor.region.classes", TTLCheckingObserver.class.getName());
+ util.startMiniCluster();
+ util.startMiniMapReduceCluster();
+ }
+
+ @AfterClass
+ public static void releaseCluster() throws Exception {
+ util.shutdownMiniMapReduceCluster();
+ util.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testMROnTable() throws Exception {
+ String tableName = "test-" + UUID.randomUUID();
+
+ // Prepare the arguments required for the test.
+ String[] args = new String[] {
+ "-D" + ImportTsv.MAPPER_CONF_KEY
+ + "=org.apache.hadoop.hbase.mapreduce.TsvImporterMapper",
+ "-D" + ImportTsv.COLUMNS_CONF_KEY + "=HBASE_ROW_KEY,FAM:A,FAM:B,HBASE_CELL_TTL",
+ "-D" + ImportTsv.SEPARATOR_CONF_KEY + "=\u001b", tableName };
+ String data = "KEY\u001bVALUE1\u001bVALUE2\u001b1000000\n";
+ util.createTable(tableName, FAMILY);
+ doMROnTableTest(util, FAMILY, data, args, 1);
+ util.deleteTable(tableName);
+ }
+
+ protected static Tool doMROnTableTest(HBaseTestingUtility util, String family, String data,
+ String[] args, int valueMultiplier) throws Exception {
+ TableName table = TableName.valueOf(args[args.length - 1]);
+ Configuration conf = new Configuration(util.getConfiguration());
+
+ // populate input file
+ FileSystem fs = FileSystem.get(conf);
+ Path inputPath = fs.makeQualified(new Path(util
+ .getDataTestDirOnTestFS(table.getNameAsString()), "input.dat"));
+ FSDataOutputStream op = fs.create(inputPath, true);
+ op.write(Bytes.toBytes(data));
+ op.close();
+ LOG.debug(String.format("Wrote test data to file: %s", inputPath));
+
+ if (conf.getBoolean(FORCE_COMBINER_CONF, true)) {
+ LOG.debug("Forcing combiner.");
+ conf.setInt("mapreduce.map.combine.minspills", 1);
+ }
+
+ // run the import
+ List<String> argv = new ArrayList<String>(Arrays.asList(args));
+ argv.add(inputPath.toString());
+ Tool tool = new ImportTsv();
+ LOG.debug("Running ImportTsv with arguments: " + argv);
+ try {
+ // Job will fail if observer rejects entries without TTL
+ assertEquals(0, ToolRunner.run(conf, tool, argv.toArray(args)));
+ } finally {
+ // Clean up
+ if (conf.getBoolean(DELETE_AFTER_LOAD_CONF, true)) {
+ LOG.debug("Deleting test subdirectory");
+ util.cleanupDataTestDirOnTestFS(table.getNameAsString());
+ }
+ }
+
+ return tool;
+ }
+
+ public static class TTLCheckingObserver extends BaseRegionObserver {
+
+ @Override
+ public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit,
+ Durability durability) throws IOException {
+ HRegion region = e.getEnvironment().getRegion();
+ if (!region.getRegionInfo().isMetaTable()
+ && !region.getRegionInfo().getTable().isSystemTable()) {
+ // The put carries the TTL attribute
+ if (put.getTTL() != Long.MAX_VALUE) {
+ return;
+ }
+ throw new IOException("Operation does not have TTL set");
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 777b3a5..b40a7eb 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -76,6 +76,7 @@ import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.RegionTooBusyException;
+import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.HRegionInfo;
@@ -88,6 +89,7 @@ import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestThread;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.TagType;
import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Append;
import org.apache.hadoop.hbase.client.Delete;
@@ -109,6 +111,7 @@ import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -127,6 +130,7 @@ import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.test.MetricsAssertHelper;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
@@ -5139,6 +5143,133 @@ public class TestHRegion {
}
}
+ @Test
+ public void testCellTTLs() throws IOException {
+ IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge();
+ EnvironmentEdgeManager.injectEdge(edge);
+
+ final byte[] row = Bytes.toBytes("testRow");
+ final byte[] q1 = Bytes.toBytes("q1");
+ final byte[] q2 = Bytes.toBytes("q2");
+ final byte[] q3 = Bytes.toBytes("q3");
+ final byte[] q4 = Bytes.toBytes("q4");
+
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testCellTTLs"));
+ HColumnDescriptor hcd = new HColumnDescriptor(fam1);
+ hcd.setTimeToLive(10); // 10 seconds
+ htd.addFamily(hcd);
+
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ conf.setInt(HFile.FORMAT_VERSION_KEY, HFile.MIN_FORMAT_VERSION_WITH_TAGS);
+
+ HRegion region = HRegion.createHRegion(new HRegionInfo(htd.getTableName(),
+ HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY),
+ TEST_UTIL.getDataTestDir(), conf, htd);
+ assertNotNull(region);
+ try {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ // Add a cell that will expire in 5 seconds via cell TTL
+ region.put(new Put(row).add(new KeyValue(row, fam1, q1, now,
+ HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+ // TTL tags specify ts in milliseconds
+ new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+ // Add a cell that will expire after 10 seconds via family setting
+ region.put(new Put(row).add(fam1, q2, now, HConstants.EMPTY_BYTE_ARRAY));
+ // Add a cell that will expire in 15 seconds via cell TTL
+ region.put(new Put(row).add(new KeyValue(row, fam1, q3, now + 10000 - 1,
+ HConstants.EMPTY_BYTE_ARRAY, new Tag[] {
+ // TTL tags specify ts in milliseconds
+ new Tag(TagType.TTL_TAG_TYPE, Bytes.toBytes(5000L)) } )));
+ // Add a cell that will expire in 20 seconds via family setting
+ region.put(new Put(row).add(fam1, q4, now + 10000 - 1, HConstants.EMPTY_BYTE_ARRAY));
+
+ // Flush so we are sure store scanning gets this right
+ region.flushcache();
+
+ // A query at time T+0 should return all cells
+ Result r = region.get(new Get(row));
+ assertNotNull(r.getValue(fam1, q1));
+ assertNotNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+5 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNotNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+10 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNotNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+15 seconds
+ edge.incrementTime(5000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNull(r.getValue(fam1, q3));
+ assertNotNull(r.getValue(fam1, q4));
+
+ // Increment time to T+20 seconds
+ edge.incrementTime(10000);
+
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+ assertNull(r.getValue(fam1, q2));
+ assertNull(r.getValue(fam1, q3));
+ assertNull(r.getValue(fam1, q4));
+
+ // Fun with disappearing increments
+
+ // Start at 1
+ region.put(new Put(row).add(fam1, q1, Bytes.toBytes(1L)));
+ r = region.get(new Get(row));
+ byte[] val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 1L);
+
+ // Increment with a TTL of 5 seconds
+ Increment incr = new Increment(row).addColumn(fam1, q1, 1L);
+ incr.setTTL(5000);
+ region.increment(incr); // 2
+
+ // New value should be 2
+ r = region.get(new Get(row));
+ val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 2L);
+
+ // Increment time to T+25 seconds
+ edge.incrementTime(5000);
+
+ // Value should be back to 1
+ r = region.get(new Get(row));
+ val = r.getValue(fam1, q1);
+ assertNotNull(val);
+ assertEquals(Bytes.toLong(val), 1L);
+
+ // Increment time to T+30 seconds
+ edge.incrementTime(5000);
+
+ // Original value written at T+20 should be gone now via family TTL
+ r = region.get(new Get(row));
+ assertNull(r.getValue(fam1, q1));
+
+ } finally {
+ HRegion.closeHRegion(region);
+ }
+ }
+
private static HRegion initHRegion(byte[] tableName, String callingMethod,
byte[]... families) throws IOException {
return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
index aafa710..2ef9838 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
@@ -91,11 +91,12 @@ public class TestQueryMatcher extends HBaseTestCase {
}
- private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
- // 2,4,5
+ private void _testMatch_ExplicitColumns(Scan scan, List<MatchCode> expected) throws IOException {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ // 2,4,5
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2),
- EnvironmentEdgeManager.currentTimeMillis() - ttl);
+ now - ttl, now);
List<KeyValue> memstore = new ArrayList<KeyValue>();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -175,9 +176,10 @@ public class TestQueryMatcher extends HBaseTestCase {
expected.add(ScanQueryMatcher.MatchCode.INCLUDE);
expected.add(ScanQueryMatcher.MatchCode.DONE);
+ long now = EnvironmentEdgeManager.currentTimeMillis();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, ttl, KeepDeletedCells.FALSE, 0, rowComparator), null,
- EnvironmentEdgeManager.currentTimeMillis() - ttl);
+ now - ttl, now);
List<KeyValue> memstore = new ArrayList<KeyValue>();
memstore.add(new KeyValue(row1, fam2, col1, 1, data));
@@ -231,7 +233,7 @@ public class TestQueryMatcher extends HBaseTestCase {
long now = EnvironmentEdgeManager.currentTimeMillis();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), get.getFamilyMap().get(fam2),
- now - testTTL);
+ now - testTTL, now);
KeyValue [] kvs = new KeyValue[] {
new KeyValue(row1, fam2, col1, now-100, data),
@@ -285,7 +287,7 @@ public class TestQueryMatcher extends HBaseTestCase {
long now = EnvironmentEdgeManager.currentTimeMillis();
ScanQueryMatcher qm = new ScanQueryMatcher(scan, new ScanInfo(fam2,
0, 1, testTTL, KeepDeletedCells.FALSE, 0, rowComparator), null,
- now - testTTL);
+ now - testTTL, now);
KeyValue [] kvs = new KeyValue[] {
new KeyValue(row1, fam2, col1, now-100, data),
@@ -343,7 +345,7 @@ public class TestQueryMatcher extends HBaseTestCase {
NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
- HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to, null);
+ HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, now, from, to, null);
List<ScanQueryMatcher.MatchCode> actual =
new ArrayList<ScanQueryMatcher.MatchCode>(rows.length);
byte[] prevRow = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
index f218a43..b00e988 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestTags.java
@@ -414,8 +414,13 @@ public class TestTags {
tags = TestCoprocessorForTags.tags;
assertEquals(5L, Bytes.toLong(kv.getValueArray(), kv.getValueOffset(), kv.getValueLength()));
assertEquals(2, tags.size());
- assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
- assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+ // We cannot assume the ordering of tags
+ List<String> tagValues = new ArrayList<String>();
+ for (Tag tag: tags) {
+ tagValues.add(Bytes.toString(tag.getValue()));
+ }
+ assertTrue(tagValues.contains("tag1"));
+ assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
@@ -471,8 +476,13 @@ public class TestTags {
kv = KeyValueUtil.ensureKeyValue(result.getColumnLatestCell(f, q));
tags = TestCoprocessorForTags.tags;
assertEquals(2, tags.size());
- assertEquals("tag1", Bytes.toString(tags.get(0).getValue()));
- assertEquals("tag2", Bytes.toString(tags.get(1).getValue()));
+ // We cannot assume the ordering of tags
+ tagValues.clear();
+ for (Tag tag: tags) {
+ tagValues.add(Bytes.toString(tag.getValue()));
+ }
+ assertTrue(tagValues.contains("tag1"));
+ assertTrue(tagValues.contains("tag2"));
TestCoprocessorForTags.checkTagPresence = false;
TestCoprocessorForTags.tags = null;
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-shell/src/main/ruby/hbase/table.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/main/ruby/hbase/table.rb b/hbase-shell/src/main/ruby/hbase/table.rb
index 3fe006b..10b754a 100644
--- a/hbase-shell/src/main/ruby/hbase/table.rb
+++ b/hbase-shell/src/main/ruby/hbase/table.rb
@@ -136,17 +136,19 @@ EOF
set_attributes(p, attributes) if attributes
visibility = args[VISIBILITY]
set_cell_visibility(p, visibility) if visibility
+ ttl = args[TTL]
+ set_op_ttl(p, ttl) if ttl
end
#Case where attributes are specified without timestamp
if timestamp.kind_of?(Hash)
timestamp.each do |k, v|
- if v.kind_of?(Hash)
- set_attributes(p, v) if v
- end
- if v.kind_of?(String)
- set_cell_visibility(p, v) if v
- end
-
+ if k == 'ATTRIBUTES'
+ set_attributes(p, v)
+ elsif k == 'VISIBILITY'
+ set_cell_visibility(p, v)
+ elsif k == "TTL"
+ set_op_ttl(p, v)
+ end
end
timestamp = nil
end
@@ -214,6 +216,8 @@ EOF
visibility = args[VISIBILITY]
set_attributes(incr, attributes) if attributes
set_cell_visibility(incr, visibility) if visibility
+ ttl = args[TTL]
+ set_op_ttl(incr, ttl) if ttl
end
incr.addColumn(family, qualifier, value)
@table.increment(incr)
@@ -232,6 +236,8 @@ EOF
visibility = args[VISIBILITY]
set_attributes(append, attributes) if attributes
set_cell_visibility(append, visibility) if visibility
+ ttl = args[TTL]
+ set_op_ttl(append, ttl) if ttl
end
append.add(family, qualifier, value.to_s.to_java_bytes)
@table.append(append)
@@ -532,6 +538,10 @@ EOF
auths.to_java(:string)))
end
+ def set_op_ttl(op, ttl)
+ op.setTTL(ttl.to_java(:long))
+ end
+
#----------------------------
# Add general administration utilities to the shell
# each of the names below adds this method name to the table
http://git-wip-us.apache.org/repos/asf/hbase/blob/869c5665/hbase-shell/src/test/ruby/hbase/table_test.rb
----------------------------------------------------------------------
diff --git a/hbase-shell/src/test/ruby/hbase/table_test.rb b/hbase-shell/src/test/ruby/hbase/table_test.rb
index 7272229..fa2990d 100644
--- a/hbase-shell/src/test/ruby/hbase/table_test.rb
+++ b/hbase-shell/src/test/ruby/hbase/table_test.rb
@@ -530,5 +530,18 @@ module Hbase
end
end
+ define_test "mutation with TTL should expire" do
+ @test_table.put('ttlTest', 'x:a', 'foo', { TTL => 1000 } )
+ begin
+ res = @test_table._get_internal('ttlTest', 'x:a')
+ assert_not_nil(res)
+ sleep 2
+ res = @test_table._get_internal('ttlTest', 'x:a')
+ assert_nil(res)
+ ensure
+ @test_table.delete('ttlTest', 'x:a')
+ end
+ end
+
end
end