You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2011/10/21 21:57:08 UTC
svn commit: r1187531 [1/2] - in /hbase/trunk: ./ src/docbkx/
src/main/java/org/apache/hadoop/hbase/
src/main/java/org/apache/hadoop/hbase/client/
src/main/java/org/apache/hadoop/hbase/regionserver/ src/main/ruby/hbase/
src/test/java/org/apache/hadoop/h...
Author: larsh
Date: Fri Oct 21 19:57:07 2011
New Revision: 1187531
URL: http://svn.apache.org/viewvc?rev=1187531&view=rev
Log:
HBASE-4536 Allow CF to retain deleted rows
Added:
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/docbkx/book.xml
hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Attributes.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
hbase/trunk/src/main/ruby/hbase/admin.rb
hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestScanWildcardColumnTracker.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestStoreScanner.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Fri Oct 21 19:57:07 2011
@@ -2,6 +2,7 @@ HBase Change Log
Release 0.93.0 - Unreleased
NEW FEATURE
HBASE-4460 Support running an embedded ThriftServer within a RegionServer (jgray)
+ HBASE-4536 Allow CF to retain deleted rows (Lars H)
IMPROVEMENT
HBASE-4132 Extend the WALActionsListener API to accomodate log archival
Modified: hbase/trunk/src/docbkx/book.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/src/docbkx/book.xml?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/docbkx/book.xml (original)
+++ hbase/trunk/src/docbkx/book.xml Fri Oct 21 19:57:07 2011
@@ -670,14 +670,14 @@ admin.enableTable(table);
<title>
Minimum Number of Versions
</title>
- <para>Like number of max row versions, the minimum number of row versions to keep is configured per column
+ <para>Like maximum number of row versions, the minimum number of row versions to keep is configured per column
family via <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/HColumnDescriptor.html">HColumnDescriptor</link>.
The default for min versions is 0, which means the feature is disabled.
The minimum number of row versions parameter is used together with the time-to-live parameter and can be combined with the
number of row versions parameter to allow configurations such as
"keep the last T minutes worth of data, at most N versions, <emphasis>but keep at least M versions around</emphasis>"
- (where M is the value for minimum number of row versions, M<=N).
- This parameter should only be set when time-to-live is enabled for a column family and must be less or equal to the
+ (where M is the value for minimum number of row versions, M<N).
+ This parameter should only be set when time-to-live is enabled for a column family and must be less than the
number of row versions.
</para>
</section>
@@ -723,6 +723,23 @@ admin.enableTable(table);
<para>See <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/HColumnDescriptor.html">HColumnDescriptor</link> for more information.
</para>
</section>
+ <section xml:id="cf.keep.deleted">
+ <title>
+ Keeping Deleted Cells
+ </title>
+ <para>ColumnFamilies can optionally keep deleted cells. That means deleted cells can still be retrieved with
+ <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Get.html">Get</link> or
+ <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/client/Scan.html">Scan</link> operations,
+ as long these operations have a time range specified that ends before the timestamp of any delete that would affect the cells.
+ This allows for point in time queries even in the presence of deletes.
+ </para>
+ <para>
+ Deleted cells are still subject to TTL and there will never be more than "maximum number of versions" deleted cells.
+ A new "raw" scan options returns all deleted rows and the delete markers.
+ </para>
+ <para>See <link xlink:href="http://hbase.apache.org/apidocs/org/apache/hadoop/hbase/HColumnDescriptor.html">HColumnDescriptor</link> for more information.
+ </para>
+ </section>
<section xml:id="schema.bloom">
<title>Bloom Filters</title>
<para>Bloom Filters can be enabled per-ColumnFamily.
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/HColumnDescriptor.java Fri Oct 21 19:57:07 2011
@@ -88,6 +88,7 @@ public class HColumnDescriptor implement
public static final String FOREVER = "FOREVER";
public static final String REPLICATION_SCOPE = "REPLICATION_SCOPE";
public static final String MIN_VERSIONS = "MIN_VERSIONS";
+ public static final String KEEP_DELETED_CELLS = "KEEP_DELETED_CELLS";
/**
* Default compression type.
@@ -117,6 +118,11 @@ public class HColumnDescriptor implement
public static final boolean DEFAULT_IN_MEMORY = false;
/**
+ * Default setting for preventing deleted from being collected immediately.
+ */
+ public static final boolean DEFAULT_KEEP_DELETED = false;
+
+ /**
* Default setting for whether to use a block cache or not.
*/
public static final boolean DEFAULT_BLOCKCACHE = true;
@@ -151,6 +157,7 @@ public class HColumnDescriptor implement
DEFAULT_VALUES.put(BLOCKSIZE, String.valueOf(DEFAULT_BLOCKSIZE));
DEFAULT_VALUES.put(HConstants.IN_MEMORY, String.valueOf(DEFAULT_IN_MEMORY));
DEFAULT_VALUES.put(BLOCKCACHE, String.valueOf(DEFAULT_BLOCKCACHE));
+ DEFAULT_VALUES.put(KEEP_DELETED_CELLS, String.valueOf(DEFAULT_KEEP_DELETED));
}
// Column family name
@@ -265,8 +272,9 @@ public class HColumnDescriptor implement
final String compression, final boolean inMemory,
final boolean blockCacheEnabled, final int blocksize,
final int timeToLive, final String bloomFilter, final int scope) {
- this(familyName, DEFAULT_MIN_VERSIONS, maxVersions, compression, inMemory,
- blockCacheEnabled, blocksize, timeToLive, bloomFilter, scope);
+ this(familyName, DEFAULT_MIN_VERSIONS, maxVersions, DEFAULT_KEEP_DELETED,
+ compression, inMemory, blockCacheEnabled, blocksize, timeToLive,
+ bloomFilter, scope);
}
/**
@@ -275,6 +283,8 @@ public class HColumnDescriptor implement
* letter -- and may not contain a <code>:<code>
* @param minVersions Minimum number of versions to keep
* @param maxVersions Maximum number of versions to keep
+ * @param keepDeletedCells Whether to retain deleted cells until they expire
+ * up to maxVersions versions.
* @param compression Compression type
* @param inMemory If true, column data should be kept in an HRegionServer's
* cache
@@ -292,8 +302,9 @@ public class HColumnDescriptor implement
* a <code>:</code>
* @throws IllegalArgumentException if the number of versions is <= 0
*/
- public HColumnDescriptor(final byte [] familyName, final int minVersions,
- final int maxVersions, final String compression, final boolean inMemory,
+ public HColumnDescriptor(final byte[] familyName, final int minVersions,
+ final int maxVersions, final boolean keepDeletedCells,
+ final String compression, final boolean inMemory,
final boolean blockCacheEnabled, final int blocksize,
final int timeToLive, final String bloomFilter, final int scope) {
isLegalFamilyName(familyName);
@@ -309,14 +320,15 @@ public class HColumnDescriptor implement
if (timeToLive == HConstants.FOREVER) {
throw new IllegalArgumentException("Minimum versions requires TTL.");
}
- if (minVersions > maxVersions) {
- throw new IllegalArgumentException("Minimum versions must be <= "+
- "maximum versions.");
+ if (minVersions >= maxVersions) {
+ throw new IllegalArgumentException("Minimum versions must be < "
+ + "maximum versions.");
}
}
setMaxVersions(maxVersions);
setMinVersions(minVersions);
+ setKeepDeletedCells(keepDeletedCells);
setInMemory(inMemory);
setBlockCacheEnabled(blockCacheEnabled);
setTimeToLive(timeToLive);
@@ -542,6 +554,22 @@ public class HColumnDescriptor implement
setValue(HConstants.IN_MEMORY, Boolean.toString(inMemory));
}
+ public boolean getKeepDeletedCells() {
+ String value = getValue(KEEP_DELETED_CELLS);
+ if (value != null) {
+ return Boolean.valueOf(value).booleanValue();
+ }
+ return DEFAULT_KEEP_DELETED;
+ }
+
+ /**
+ * @param keepDeletedRows True if deleted rows should not be collected
+ * immediately.
+ */
+ public void setKeepDeletedCells(boolean keepDeletedCells) {
+ setValue(KEEP_DELETED_CELLS, Boolean.toString(keepDeletedCells));
+ }
+
/**
* @return Time-to-live of cell contents, in seconds.
*/
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/KeyValue.java Fri Oct 21 19:57:07 2011
@@ -207,6 +207,14 @@ public class KeyValue implements Writabl
// the row cached
private volatile byte [] rowCache = null;
+ /**
+ * @return True if a delete type, a {@link KeyValue.Type#Delete} or
+ * a {KeyValue.Type#DeleteFamily} or a {@link KeyValue.Type#DeleteColumn}
+ * KeyValue type.
+ */
+ public static boolean isDelete(byte t) {
+ return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
+ }
/** Here be dragons **/
@@ -1038,8 +1046,7 @@ public class KeyValue implements Writabl
* KeyValue type.
*/
public boolean isDelete() {
- int t = getType();
- return Type.Delete.getCode() <= t && t <= Type.DeleteFamily.getCode();
+ return KeyValue.isDelete(getType());
}
/**
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Attributes.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Attributes.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Attributes.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Attributes.java Fri Oct 21 19:57:07 2011
@@ -26,6 +26,7 @@ public interface Attributes {
/**
* Sets an attribute.
* In case value = null attribute is removed from the attributes map.
+ * Attribute names starting with _ indicate system attributes.
* @param name attribute name
* @param value attribute value
*/
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/Scan.java Fri Oct 21 19:57:07 2011
@@ -82,12 +82,13 @@ import java.util.TreeSet;
* execute {@link #setCacheBlocks(boolean)}.
*/
public class Scan extends OperationWithAttributes implements Writable {
+ private static final String RAW_ATTR = "_raw_";
+
private static final byte SCAN_VERSION = (byte)2;
private byte [] startRow = HConstants.EMPTY_START_ROW;
private byte [] stopRow = HConstants.EMPTY_END_ROW;
private int maxVersions = 1;
private int batch = -1;
-
// If application wants to collect scan metrics, it needs to
// call scan.setAttribute(SCAN_ATTRIBUTES_ENABLE, Bytes.toBytes(Boolean.TRUE))
static public String SCAN_ATTRIBUTES_METRICS_ENABLE =
@@ -695,4 +696,26 @@ public class Scan extends OperationWithA
}
return cols.toString();
}
+
+ /**
+ * Enable/disable "raw" mode for this scan.
+ * If "raw" is enabled the scan will return all
+ * delete marker and deleted rows that have not
+ * been collected, yet.
+ * This is mostly useful for Scan on column families
+ * that have KEEP_DELETED_ROWS enabled.
+ * It is an error to specify any column when "raw" is set.
+ * @param raw True/False to enable/disable "raw" mode.
+ */
+ public void setRaw(boolean raw) {
+ setAttribute(RAW_ATTR, Bytes.toBytes(raw));
+ }
+
+ /**
+ * @return True if this Scan is in "raw" mode.
+ */
+ public boolean isRaw() {
+ byte[] attr = getAttribute(RAW_ATTR);
+ return attr == null ? false : Bytes.toBoolean(attr);
+ }
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ColumnTracker.java Fri Oct 21 19:57:07 2011
@@ -47,12 +47,14 @@ public interface ColumnTracker {
* @param offset
* @param length
* @param ttl The timeToLive to enforce.
+ * @param type The type of the KeyValue
* @return The match code instance.
* @throws IOException in case there is an internal consistency problem
* caused by a data corruption.
*/
- public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
- int length, long ttl) throws IOException;
+ public ScanQueryMatcher.MatchCode checkColumn(byte[] bytes, int offset,
+ int length, long ttl, byte type)
+ throws IOException;
/**
* Updates internal variables in between files
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ExplicitColumnTracker.java Fri Oct 21 19:57:07 2011
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.NavigableSet;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.util.Bytes;
@@ -97,16 +98,14 @@ public class ExplicitColumnTracker imple
}
/**
- * Checks against the parameters of the query and the columns which have
- * already been processed by this query.
- * @param bytes KeyValue buffer
- * @param offset offset to the start of the qualifier
- * @param length length of the qualifier
- * @param timestamp timestamp of the key being checked
- * @return MatchCode telling ScanQueryMatcher what action to take
+ * {@inheritDoc}
*/
+ @Override
public ScanQueryMatcher.MatchCode checkColumn(byte [] bytes, int offset,
- int length, long timestamp) {
+ int length, long timestamp, byte type) {
+ // delete markers should never be passed to an
+ // *Explicit*ColumnTracker
+ assert !KeyValue.isDelete(type);
do {
// No more columns left, we are done with this query
if(this.columns.size() == 0) {
@@ -143,12 +142,12 @@ public class ExplicitColumnTracker imple
if (this.columns.size() == this.index) {
// We have served all the requested columns.
this.column = null;
- return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
+ return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
} else {
- // We are done with current column; advance to next column
- // of interest.
+ // We are done with current column; advance to next column
+ // of interest.
this.column = this.columns.get(this.index);
- return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
+ return ScanQueryMatcher.MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
}
} else {
setTS(timestamp);
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Fri Oct 21 19:57:07 2011
@@ -20,6 +20,9 @@
package org.apache.hadoop.hbase.regionserver;
+import java.io.IOException;
+import java.util.NavigableSet;
+
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
@@ -29,8 +32,7 @@ import org.apache.hadoop.hbase.io.TimeRa
import org.apache.hadoop.hbase.regionserver.DeleteTracker.DeleteResult;
import org.apache.hadoop.hbase.util.Bytes;
-import java.io.IOException;
-import java.util.NavigableSet;
+import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
/**
* A query matcher that is specifically designed for the scan case.
@@ -39,72 +41,104 @@ public class ScanQueryMatcher {
// Optimization so we can skip lots of compares when we decide to skip
// to the next row.
private boolean stickyNextRow;
- private byte[] stopRow;
+ private final byte[] stopRow;
- protected TimeRange tr;
+ private final TimeRange tr;
- protected Filter filter;
+ private final Filter filter;
/** Keeps track of deletes */
- protected DeleteTracker deletes;
- protected boolean retainDeletesInOutput;
+ private final DeleteTracker deletes;
+
+ /*
+ * The following three booleans define how we deal with deletes.
+ * There are three different aspects:
+ * 1. Whether to keep delete markers. This is used in compactions.
+ * Minor compactions always keep delete markers.
+ * 2. Whether to keep deleted rows. This is also used in compactions,
+ * if the store is set to keep deleted rows. This implies keeping
+ * the delete markers as well.
+ * In this case deleted rows are subject to the normal max version
+ * and TTL/min version rules just like "normal" rows.
+ * 3. Whether a scan can do time travel queries even before deleted
+ * marker to reach deleted rows.
+ */
+ /** whether to retain delete markers */
+ private final boolean retainDeletesInOutput;
+ /** whether to return deleted rows */
+ private final boolean keepDeletedCells;
+ /** whether time range queries can see rows "behind" a delete */
+ private final boolean seePastDeleteMarkers;
+
/** Keeps track of columns and versions */
- protected ColumnTracker columns;
+ private final ColumnTracker columns;
/** Key to seek to in memstore and StoreFiles */
- protected KeyValue startKey;
+ private final KeyValue startKey;
/** Row comparator for the region this query is for */
- KeyValue.KeyComparator rowComparator;
+ private final KeyValue.KeyComparator rowComparator;
+ /* row is not private for tests */
/** Row the query is on */
- protected byte [] row;
+ byte [] row;
+
+ /**
+ * Oldest put in any of the involved store files
+ * Used to decide whether it is ok to delete
+ * family delete marker of this store keeps
+ * deleted KVs.
+ */
+ private final long earliestPutTs;
/**
- * Constructs a ScanQueryMatcher for a Scan.
+ * Construct a QueryMatcher for a scan
* @param scan
- * @param family
+ * @param scanInfo The store's immutable scan info
* @param columns
- * @param ttl
- * @param rowComparator
+ * @param scanType Type of the scan
+ * @param earliestPutTs Earliest put seen in any of the store files.
*/
- public ScanQueryMatcher(Scan scan, byte [] family,
- NavigableSet<byte[]> columns, long ttl,
- KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions,
- boolean retainDeletesInOutput) {
+ public ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo,
+ NavigableSet<byte[]> columns, StoreScanner.ScanType scanType,
+ long earliestPutTs) {
this.tr = scan.getTimeRange();
- this.rowComparator = rowComparator;
+ this.rowComparator = scanInfo.getComparator().getRawComparator();
this.deletes = new ScanDeleteTracker();
this.stopRow = scan.getStopRow();
- this.startKey = KeyValue.createFirstOnRow(scan.getStartRow(), family, null);
+ this.startKey = KeyValue.createFirstOnRow(scan.getStartRow(), scanInfo.getFamily(), null);
this.filter = scan.getFilter();
- this.retainDeletesInOutput = retainDeletesInOutput;
+ this.earliestPutTs = earliestPutTs;
+
+ /* how to deal with deletes */
+ // keep deleted cells: if compaction or raw scan
+ this.keepDeletedCells = (scanInfo.getKeepDeletedCells() && scanType != ScanType.USER_SCAN) || scan.isRaw();
+ // retain deletes: if minor compaction or raw scan
+ this.retainDeletesInOutput = scanType == ScanType.MINOR_COMPACT || scan.isRaw();
+ // seePastDeleteMarker: user initiated scans
+ this.seePastDeleteMarkers = scanInfo.getKeepDeletedCells() && scanType == ScanType.USER_SCAN;
+ int maxVersions = Math.min(scan.getMaxVersions(), scanInfo.getMaxVersions());
// Single branch to deal with two types of reads (columns vs all in family)
if (columns == null || columns.size() == 0) {
// use a specialized scan for wildcard column tracker.
- this.columns = new ScanWildcardColumnTracker(minVersions, maxVersions, ttl);
+ this.columns = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersions, scanInfo.getTtl());
} else {
// We can share the ExplicitColumnTracker, diff is we reset
// between rows, not between storefiles.
- this.columns = new ExplicitColumnTracker(columns, minVersions, maxVersions,
- ttl);
+ this.columns = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(), maxVersions,
+ scanInfo.getTtl());
}
}
- public ScanQueryMatcher(Scan scan, byte [] family,
- NavigableSet<byte[]> columns, long ttl,
- KeyValue.KeyComparator rowComparator, int minVersions, int maxVersions) {
- /* By default we will not include deletes */
- /* deletes are included explicitly (for minor compaction) */
- this(scan, family, columns, ttl, rowComparator, minVersions, maxVersions,
- false);
- }
- public ScanQueryMatcher(Scan scan, byte [] family,
- NavigableSet<byte[]> columns, long ttl,
- KeyValue.KeyComparator rowComparator, int maxVersions) {
- this(scan, family, columns, ttl, rowComparator, 0, maxVersions);
+ /*
+ * Constructor for tests
+ */
+ ScanQueryMatcher(Scan scan, Store.ScanInfo scanInfo,
+ NavigableSet<byte[]> columns) {
+ this(scan, scanInfo, columns, StoreScanner.ScanType.USER_SCAN,
+ HConstants.LATEST_TIMESTAMP);
}
/**
@@ -171,21 +205,50 @@ public class ScanQueryMatcher {
return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
}
+ /*
+ * The delete logic is pretty complicated now.
+ * This is corroborated by the following:
+ * 1. The store might be instructed to keep deleted rows around.
+ * 2. A scan can optionally see past a delete marker now.
+ * 3. If deleted rows are kept, we have to find out when we can
+ * remove the delete markers.
+ * 4. Family delete markers are always first (regardless of their TS)
+ * 5. Delete markers should not be counted as version
+ * 6. Delete markers affect puts of the *same* TS
+ * 7. Delete marker need to be version counted together with puts
+ * they affect
+ */
byte type = kv.getType();
- if (isDelete(type)) {
- if (tr.withinOrAfterTimeRange(timestamp)) {
- this.deletes.add(bytes, offset, qualLength, timestamp, type);
+ if (kv.isDelete()) {
+ if (!keepDeletedCells) {
+ // first ignore delete markers if the scanner can do so, and the
+ // range does not include the marker
+ boolean includeDeleteMarker = seePastDeleteMarkers ?
+ // +1, to allow a range between a delete and put of same TS
+ tr.withinTimeRange(timestamp+1) :
+ tr.withinOrAfterTimeRange(timestamp);
+ if (includeDeleteMarker) {
+ this.deletes.add(bytes, offset, qualLength, timestamp, type);
+ }
// Can't early out now, because DelFam come before any other keys
}
if (retainDeletesInOutput) {
+ // always include
return MatchCode.INCLUDE;
- }
- else {
+ } else if (keepDeletedCells) {
+ if (timestamp < earliestPutTs) {
+ // keeping delete rows, but there are no puts older than
+ // this delete in the store files.
+ return columns.getNextRowOrNextColumn(bytes, offset, qualLength);
+ }
+ // else: fall through and do version counting on the
+ // delete markers
+ } else {
return MatchCode.SKIP;
}
- }
-
- if (!this.deletes.isEmpty()) {
+ // note the following next else if...
+ // delete marker are not subject to other delete markers
+ } else if (!this.deletes.isEmpty()) {
DeleteResult deleteResult = deletes.isDeleted(bytes, offset, qualLength,
timestamp);
switch (deleteResult) {
@@ -228,7 +291,8 @@ public class ScanQueryMatcher {
}
}
- MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength, timestamp);
+ MatchCode colChecker = columns.checkColumn(bytes, offset, qualLength,
+ timestamp, type);
/*
* According to current implementation, colChecker can only be
* SEEK_NEXT_COL, SEEK_NEXT_ROW, SKIP or INCLUDE. Therefore, always return
@@ -269,11 +333,6 @@ public class ScanQueryMatcher {
stickyNextRow = false;
}
- // should be in KeyValue.
- protected boolean isDelete(byte type) {
- return (type != KeyValue.Type.Put.getCode());
- }
-
/**
*
* @return the start key
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/ScanWildcardColumnTracker.java Fri Oct 21 19:57:07 2011
@@ -22,9 +22,8 @@ package org.apache.hadoop.hbase.regionse
import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.util.Bytes;
@@ -32,17 +31,17 @@ import org.apache.hadoop.hbase.util.Byte
* Keeps track of the columns for a scan if they are not explicitly specified
*/
public class ScanWildcardColumnTracker implements ColumnTracker {
- private static final Log LOG =
- LogFactory.getLog(ScanWildcardColumnTracker.class);
private byte [] columnBuffer = null;
private int columnOffset = 0;
private int columnLength = 0;
private int currentCount = 0;
private int maxVersions;
private int minVersions;
- /* Keeps track of the latest timestamp included for current column.
+ /* Keeps track of the latest timestamp and type included for current column.
* Used to eliminate duplicates. */
private long latestTSOfCurrentColumn;
+ private byte latestTypeOfCurrentColumn;
+
private long oldestStamp;
/**
@@ -58,41 +57,38 @@ public class ScanWildcardColumnTracker i
}
/**
- * Can only return INCLUDE or SKIP, since returning "NEXT" or
- * "DONE" would imply we have finished with this row, when
- * this class can't figure that out.
- *
- * @param bytes
- * @param offset
- * @param length
- * @param timestamp
- * @return The match code instance.
+ * {@inheritDoc}
+ * This receives puts *and* deletes.
+ * Deletes do not count as a version, but rather take the version
+ * of the previous put (so eventually all but the last can be reclaimed).
*/
@Override
public MatchCode checkColumn(byte[] bytes, int offset, int length,
- long timestamp) throws IOException {
+ long timestamp, byte type) throws IOException {
+
if (columnBuffer == null) {
// first iteration.
resetBuffer(bytes, offset, length);
- return checkVersion(++currentCount, timestamp);
+ // do not count a delete marker as another version
+ return checkVersion(type, timestamp);
}
int cmp = Bytes.compareTo(bytes, offset, length,
columnBuffer, columnOffset, columnLength);
if (cmp == 0) {
//If column matches, check if it is a duplicate timestamp
- if (sameAsPreviousTS(timestamp)) {
+ if (sameAsPreviousTSAndType(timestamp, type)) {
return ScanQueryMatcher.MatchCode.SKIP;
}
- return checkVersion(++currentCount, timestamp);
+ return checkVersion(type, timestamp);
}
- resetTS();
+ resetTSAndType();
// new col > old col
if (cmp > 0) {
// switched columns, lets do something.x
resetBuffer(bytes, offset, length);
- return checkVersion(++currentCount, timestamp);
+ return checkVersion(type, timestamp);
}
// new col < oldcol
@@ -112,13 +108,25 @@ public class ScanWildcardColumnTracker i
currentCount = 0;
}
- private MatchCode checkVersion(int version, long timestamp) {
- if (version > maxVersions) {
+ /**
+ * Check whether this version should be retained.
+ * There are 4 variables considered:
+ * If this version is past max versions -> skip it
+ * If this kv has expired or was deleted, check min versions
+ * to decide whther to skip it or not.
+ *
+ * Increase the version counter unless this is a delete
+ */
+ private MatchCode checkVersion(byte type, long timestamp) {
+ if (!KeyValue.isDelete(type)) {
+ currentCount++;
+ }
+ if (currentCount > maxVersions) {
return ScanQueryMatcher.MatchCode.SEEK_NEXT_COL; // skip to next col
}
// keep the KV if required by minversions or it is not expired, yet
- if (version <= minVersions || !isExpired(timestamp)) {
- setTS(timestamp);
+ if (currentCount <= minVersions || !isExpired(timestamp)) {
+ setTSAndType(timestamp, type);
return ScanQueryMatcher.MatchCode.INCLUDE;
} else {
return MatchCode.SEEK_NEXT_COL;
@@ -136,19 +144,21 @@ public class ScanWildcardColumnTracker i
@Override
public void reset() {
columnBuffer = null;
- resetTS();
+ resetTSAndType();
}
- private void resetTS() {
+ private void resetTSAndType() {
latestTSOfCurrentColumn = HConstants.LATEST_TIMESTAMP;
+ latestTypeOfCurrentColumn = 0;
}
- private void setTS(long timestamp) {
+ private void setTSAndType(long timestamp, byte type) {
latestTSOfCurrentColumn = timestamp;
+ latestTypeOfCurrentColumn = type;
}
- private boolean sameAsPreviousTS(long timestamp) {
- return timestamp == latestTSOfCurrentColumn;
+ private boolean sameAsPreviousTSAndType(long timestamp, byte type) {
+ return timestamp == latestTSOfCurrentColumn && type == latestTypeOfCurrentColumn;
}
private boolean isExpired(long timestamp) {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/Store.java Fri Oct 21 19:57:07 2011
@@ -41,6 +41,7 @@ import org.apache.hadoop.hbase.HColumnDe
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.KeyValue.KVComparator;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HeapSize;
@@ -49,6 +50,7 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
+import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionProgress;
import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
@@ -95,9 +97,7 @@ public class Store implements HeapSize {
final Configuration conf;
final CacheConfig cacheConf;
// ttl in milliseconds.
- protected long ttl;
- protected int minVersions;
- protected int maxVersions;
+ private long ttl;
long majorCompactionTime;
private final int minFilesToCompact;
private final int maxFilesToCompact;
@@ -119,6 +119,8 @@ public class Store implements HeapSize {
private CompactionProgress progress;
private final int compactionKVMax;
+ // not private for testing
+ /* package */ScanInfo scanInfo;
/*
* List of store files inside this store. This is an immutable list that
* is atomically replaced when its contents change.
@@ -183,8 +185,9 @@ public class Store implements HeapSize {
// second -> ms adjust for user data
this.ttl *= 1000;
}
- this.minVersions = family.getMinVersions();
- this.maxVersions = family.getMaxVersions();
+ scanInfo = new ScanInfo(family.getName(), family.getMinVersions(),
+ family.getMaxVersions(), ttl, family.getKeepDeletedCells(),
+ this.comparator);
this.memstore = new MemStore(conf, this.comparator);
this.storeNameStr = Bytes.toString(this.family.getName());
@@ -477,13 +480,13 @@ public class Store implements HeapSize {
return null;
}
Scan scan = new Scan();
- scan.setMaxVersions(maxVersions);
+ scan.setMaxVersions(scanInfo.getMaxVersions());
// Use a store scanner to find which rows to flush.
// Note that we need to retain deletes, hence
- // pass true as the StoreScanner's retainDeletesInOutput argument.
- InternalScanner scanner = new StoreScanner(this, scan,
- Collections.singletonList(new CollectionBackedScanner(set,
- this.comparator)), true);
+ // treat this as a minor compaction.
+ InternalScanner scanner = new StoreScanner(this, scan, Collections
+ .singletonList(new CollectionBackedScanner(set, this.comparator)),
+ ScanType.MINOR_COMPACT, HConstants.OLDEST_TIMESTAMP);
try {
// TODO: We can fail in the below block before we complete adding this
// flush to list of store files. Add cleanup of anything put on filesystem
@@ -1108,6 +1111,7 @@ public class Store implements HeapSize {
throws IOException {
// calculate maximum key count after compaction (for blooms)
int maxKeyCount = 0;
+ long earliestPutTs = HConstants.LATEST_TIMESTAMP;
for (StoreFile file : filesToCompact) {
StoreFile.Reader r = file.getReader();
if (r != null) {
@@ -1123,6 +1127,19 @@ public class Store implements HeapSize {
", size=" + StringUtils.humanReadableInt(r.length()) );
}
}
+ // For major compactions calculate the earliest put timestamp
+ // of all involved storefiles. This is used to remove
+ // family delete marker during the compaction.
+ if (majorCompaction) {
+ byte[] tmp = r.loadFileInfo().get(StoreFile.EARLIEST_PUT_TS);
+ if (tmp == null) {
+ // there's a file with no information, must be an old one
+ // assume we have very old puts
+ earliestPutTs = HConstants.OLDEST_TIMESTAMP;
+ } else {
+ earliestPutTs = Math.min(earliestPutTs, Bytes.toLong(tmp));
+ }
+ }
}
// keep track of compaction progress
@@ -1141,7 +1158,9 @@ public class Store implements HeapSize {
Scan scan = new Scan();
scan.setMaxVersions(family.getMaxVersions());
/* include deletes, unless we are doing a major compaction */
- scanner = new StoreScanner(this, scan, scanners, !majorCompaction);
+ scanner = new StoreScanner(this, scan, scanners,
+ majorCompaction ? ScanType.MAJOR_COMPACT : ScanType.MINOR_COMPACT,
+ earliestPutTs);
if (region.getCoprocessorHost() != null) {
InternalScanner cpScanner = region.getCoprocessorHost().preCompact(
this, scanner);
@@ -1374,7 +1393,7 @@ public class Store implements HeapSize {
// at all (expired or not) has at least one version that will not expire.
// Note that this method used to take a KeyValue as arguments. KeyValue
// can be back-dated, a row key cannot.
- long ttlToUse = this.minVersions > 0 ? Long.MAX_VALUE : this.ttl;
+ long ttlToUse = scanInfo.getMinVersions() > 0 ? Long.MAX_VALUE : this.ttl;
KeyValue kv = new KeyValue(row, HConstants.LATEST_TIMESTAMP);
@@ -1842,15 +1861,16 @@ public class Store implements HeapSize {
return this.cacheConf;
}
- public static final long FIXED_OVERHEAD = ClassSize.align(
- ClassSize.OBJECT + (17 * ClassSize.REFERENCE) +
- (7 * Bytes.SIZEOF_LONG) + (1 * Bytes.SIZEOF_DOUBLE) +
- (7 * Bytes.SIZEOF_INT) + (1 * Bytes.SIZEOF_BOOLEAN));
-
- public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD +
- ClassSize.OBJECT + ClassSize.REENTRANT_LOCK +
- ClassSize.CONCURRENT_SKIPLISTMAP +
- ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT);
+ public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ + (18 * ClassSize.REFERENCE) + (7 * Bytes.SIZEOF_LONG)
+ + (1 * Bytes.SIZEOF_DOUBLE) + (5 * Bytes.SIZEOF_INT)
+ + Bytes.SIZEOF_BOOLEAN);
+
+ public static final long DEEP_OVERHEAD = ClassSize.align(FIXED_OVERHEAD
+ + ClassSize.OBJECT + ClassSize.REENTRANT_LOCK
+ + ClassSize.CONCURRENT_SKIPLISTMAP
+ + ClassSize.CONCURRENT_SKIPLISTMAP_ENTRY + ClassSize.OBJECT
+ + ScanInfo.FIXED_OVERHEAD);
@Override
public long heapSize() {
@@ -1861,4 +1881,62 @@ public class Store implements HeapSize {
return comparator;
}
+ /**
+ * Immutable information for scans over a store.
+ */
+ public static class ScanInfo {
+ private byte[] family;
+ private int minVersions;
+ private int maxVersions;
+ private long ttl;
+ private boolean keepDeletedCells;
+ private KVComparator comparator;
+
+ public static final long FIXED_OVERHEAD = ClassSize.align(ClassSize.OBJECT
+ + (2 * ClassSize.REFERENCE) + (2 * Bytes.SIZEOF_INT)
+ + Bytes.SIZEOF_LONG + Bytes.SIZEOF_BOOLEAN);
+
+ /**
+ * @param family Name of this store's column family
+ * @param minVersions Store's MIN_VERSIONS setting
+ * @param maxVersions Store's VERSIONS setting
+ * @param ttl Store's TTL (in ms)
+ * @param keepDeletedCells Store's keepDeletedCells setting
+ * @param comparator The store's comparator
+ */
+ public ScanInfo(byte[] family, int minVersions, int maxVersions, long ttl,
+ boolean keepDeletedCells, KVComparator comparator) {
+
+ this.family = family;
+ this.minVersions = minVersions;
+ this.maxVersions = maxVersions;
+ this.ttl = ttl;
+ this.keepDeletedCells = keepDeletedCells;
+ this.comparator = comparator;
+ }
+
+ public byte[] getFamily() {
+ return family;
+ }
+
+ public int getMinVersions() {
+ return minVersions;
+ }
+
+ public int getMaxVersions() {
+ return maxVersions;
+ }
+
+ public long getTtl() {
+ return ttl;
+ }
+
+ public boolean getKeepDeletedCells() {
+ return keepDeletedCells;
+ }
+
+ public KVComparator getComparator() {
+ return comparator;
+ }
+ }
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreFile.java Fri Oct 21 19:57:07 2011
@@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
@@ -116,6 +117,9 @@ public class StoreFile {
/** Key for Timerange information in metadata*/
public static final byte[] TIMERANGE_KEY = Bytes.toBytes("TIMERANGE");
+ /** Key for timestamp of earliest-put in metadata*/
+ public static final byte[] EARLIEST_PUT_TS = Bytes.toBytes("EARLIEST_PUT_TS");
+
// Make default block size for StoreFiles 8k while testing. TODO: FIX!
// Need to make it 8k for testing.
public static final int DEFAULT_BLOCKSIZE_SMALL = 8 * 1024;
@@ -737,6 +741,7 @@ public class StoreFile {
private int lastBloomKeyOffset, lastBloomKeyLen;
private KVComparator kvComparator;
private KeyValue lastKv = null;
+ private long earliestPutTs = HConstants.LATEST_TIMESTAMP;
TimeRangeTracker timeRangeTracker = new TimeRangeTracker();
/* isTimeRangeTrackerSet keeps track if the timeRange has already been set
@@ -796,14 +801,15 @@ public class StoreFile {
writer.appendFileInfo(MAX_SEQ_ID_KEY, Bytes.toBytes(maxSequenceId));
writer.appendFileInfo(MAJOR_COMPACTION_KEY,
Bytes.toBytes(majorCompaction));
- appendTimeRangeMetadata();
+ appendTrackedTimestampsToMetadata();
}
/**
- * Add TimestampRange to Metadata
+ * Add TimestampRange and earliest put timestamp to Metadata
*/
- public void appendTimeRangeMetadata() throws IOException {
+ public void appendTrackedTimestampsToMetadata() throws IOException {
appendFileInfo(TIMERANGE_KEY,WritableUtils.toByteArray(timeRangeTracker));
+ appendFileInfo(EARLIEST_PUT_TS, Bytes.toBytes(earliestPutTs));
}
/**
@@ -816,26 +822,19 @@ public class StoreFile {
}
/**
+ * Record the earlest Put timestamp.
+ *
* If the timeRangeTracker is not set,
* update TimeRangeTracker to include the timestamp of this key
* @param kv
* @throws IOException
*/
- public void includeInTimeRangeTracker(final KeyValue kv) {
- if (!isTimeRangeTrackerSet) {
- timeRangeTracker.includeTimestamp(kv);
+ public void trackTimestamps(final KeyValue kv) {
+ if (KeyValue.Type.Put.getCode() == kv.getType()) {
+ earliestPutTs = Math.min(earliestPutTs, kv.getTimestamp());
}
- }
-
- /**
- * If the timeRangeTracker is not set,
- * update TimeRangeTracker to include the timestamp of this key
- * @param key
- * @throws IOException
- */
- public void includeInTimeRangeTracker(final byte [] key) {
if (!isTimeRangeTrackerSet) {
- timeRangeTracker.includeTimestamp(key);
+ timeRangeTracker.includeTimestamp(kv);
}
}
@@ -908,7 +907,7 @@ public class StoreFile {
}
}
writer.append(kv);
- includeInTimeRangeTracker(kv);
+ trackTimestamps(kv);
}
public Path getPath() {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Fri Oct 21 19:57:07 2011
@@ -20,9 +20,10 @@
package org.apache.hadoop.hbase.regionserver;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
@@ -86,12 +87,14 @@ class StoreScanner extends NonLazyKeyVal
* @throws IOException
*/
StoreScanner(Store store, Scan scan, final NavigableSet<byte[]> columns)
- throws IOException {
+ throws IOException {
this(store, scan.getCacheBlocks(), scan, columns);
- matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
- columns, store.ttl, store.comparator.getRawComparator(),
- store.minVersions, store.versionsToReturn(scan.getMaxVersions()),
- false);
+ if (columns != null && scan.isRaw()) {
+ throw new DoNotRetryIOException(
+ "Cannot specify any column for a raw scan");
+ }
+ matcher = new ScanQueryMatcher(scan, store.scanInfo, columns,
+ ScanType.USER_SCAN, HConstants.LATEST_TIMESTAMP);
// Pass columns to try to filter out unnecessary StoreFiles.
List<KeyValueScanner> scanners = getScanners(scan, columns);
@@ -124,12 +127,12 @@ class StoreScanner extends NonLazyKeyVal
* @param scan the spec
* @param scanners ancilliary scanners
*/
- StoreScanner(Store store, Scan scan, List<? extends KeyValueScanner> scanners,
- boolean retainDeletesInOutput) throws IOException {
+ StoreScanner(Store store, Scan scan,
+ List<? extends KeyValueScanner> scanners, ScanType scanType,
+ long earliestPutTs) throws IOException {
this(store, false, scan, null);
- matcher = new ScanQueryMatcher(scan, store.getFamily().getName(),
- null, store.ttl, store.comparator.getRawComparator(), store.minVersions,
- store.versionsToReturn(scan.getMaxVersions()), retainDeletesInOutput);
+ matcher = new ScanQueryMatcher(scan, store.scanInfo, null, scanType,
+ earliestPutTs);
// Seek all scanners to the initial key
for(KeyValueScanner scanner : scanners) {
@@ -141,20 +144,18 @@ class StoreScanner extends NonLazyKeyVal
}
// Constructor for testing.
- StoreScanner(final Scan scan, final byte [] colFamily, final long ttl,
- final KeyValue.KVComparator comparator,
- final NavigableSet<byte[]> columns,
- final List<KeyValueScanner> scanners)
- throws IOException {
+ StoreScanner(final Scan scan, Store.ScanInfo scanInfo,
+ StoreScanner.ScanType scanType, final NavigableSet<byte[]> columns,
+ final List<KeyValueScanner> scanners) throws IOException {
this(null, scan.getCacheBlocks(), scan, columns);
- this.matcher = new ScanQueryMatcher(scan, colFamily, columns, ttl,
- comparator.getRawComparator(), 0, scan.getMaxVersions(), false);
+ this.matcher = new ScanQueryMatcher(scan, scanInfo, columns, scanType,
+ HConstants.LATEST_TIMESTAMP);
// Seek all scanners to the initial key
- for(KeyValueScanner scanner : scanners) {
+ for (KeyValueScanner scanner : scanners) {
scanner.seek(matcher.getStartKey());
}
- heap = new KeyValueHeap(scanners, comparator);
+ heap = new KeyValueHeap(scanners, scanInfo.getComparator());
}
/*
@@ -476,5 +477,13 @@ class StoreScanner extends NonLazyKeyVal
lazySeekEnabledGlobally = enable;
}
+ /**
+ * Enum to distinguish general scan types.
+ */
+ public static enum ScanType {
+ MAJOR_COMPACT,
+ MINOR_COMPACT,
+ USER_SCAN
+ }
}
Modified: hbase/trunk/src/main/ruby/hbase/admin.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/hbase/admin.rb?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/hbase/admin.rb (original)
+++ hbase/trunk/src/main/ruby/hbase/admin.rb Fri Oct 21 19:57:07 2011
@@ -463,6 +463,7 @@ module Hbase
family.setBlocksize(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOCKSIZE)
family.setMaxVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::VERSIONS)
family.setMinVersions(JInteger.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::MIN_VERSIONS)
+ family.setKeepDeletedRows(JBoolean.valueOf(arg[org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS])) if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::KEEP_DELETED_CELLS)
if arg.include?(org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER)
bloomtype = arg[org.apache.hadoop.hbase.HColumnDescriptor::BLOOMFILTER].upcase
unless org.apache.hadoop.hbase.regionserver.StoreFile::BloomType.constants.include?(bloomtype)
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/HBaseTestCase.java Fri Oct 21 19:57:07 2011
@@ -198,7 +198,7 @@ public abstract class HBaseTestCase exte
protected HTableDescriptor createTableDescriptor(final String name,
final int versions) {
return createTableDescriptor(name, HColumnDescriptor.DEFAULT_MIN_VERSIONS,
- versions, HConstants.FOREVER);
+ versions, HConstants.FOREVER, HColumnDescriptor.DEFAULT_KEEP_DELETED);
}
/**
@@ -209,21 +209,21 @@ public abstract class HBaseTestCase exte
* @return Column descriptor.
*/
protected HTableDescriptor createTableDescriptor(final String name,
- final int minVersions, final int versions, final int ttl) {
+ final int minVersions, final int versions, final int ttl, boolean keepDeleted) {
HTableDescriptor htd = new HTableDescriptor(name);
htd.addFamily(new HColumnDescriptor(fam1, minVersions, versions,
- HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
- HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
- HColumnDescriptor.DEFAULT_BLOOMFILTER,
- HConstants.REPLICATION_SCOPE_LOCAL));
+ keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
+ HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
+ HColumnDescriptor.DEFAULT_BLOOMFILTER,
+ HConstants.REPLICATION_SCOPE_LOCAL));
htd.addFamily(new HColumnDescriptor(fam2, minVersions, versions,
- HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
+ keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
HColumnDescriptor.DEFAULT_BLOOMFILTER,
HConstants.REPLICATION_SCOPE_LOCAL));
htd.addFamily(new HColumnDescriptor(fam3, minVersions, versions,
- HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
- HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
+ keepDeleted, HColumnDescriptor.DEFAULT_COMPRESSION, false, false,
+ HColumnDescriptor.DEFAULT_BLOCKSIZE, ttl,
HColumnDescriptor.DEFAULT_BLOOMFILTER,
HConstants.REPLICATION_SCOPE_LOCAL));
return htd;
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java Fri Oct 21 19:57:07 2011
@@ -20,6 +20,7 @@
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
@@ -71,10 +72,8 @@ import org.apache.hadoop.hbase.io.hfile.
import org.apache.hadoop.hbase.io.hfile.CacheConfig;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.Store;
-import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.DataInputBuffer;
-import org.apache.hadoop.metrics.util.MetricsTimeVaryingLong;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.junit.After;
import org.junit.AfterClass;
@@ -129,6 +128,82 @@ public class TestFromClientSide {
}
/**
+ * Basic client side validation of HBASE-4536
+ */
+ @Test
+ public void testKeepDeletedCells() throws Exception {
+ final byte[] TABLENAME = Bytes.toBytes("testKeepDeletesCells");
+ final byte[] FAMILY = Bytes.toBytes("family");
+ final byte[] C0 = Bytes.toBytes("c0");
+
+ final byte[] T1 = Bytes.toBytes("T1");
+ final byte[] T2 = Bytes.toBytes("T2");
+ final byte[] T3 = Bytes.toBytes("T3");
+ HColumnDescriptor hcd = new HColumnDescriptor(FAMILY,
+ HColumnDescriptor.DEFAULT_MIN_VERSIONS,
+ HColumnDescriptor.DEFAULT_VERSIONS,
+ true,
+ HColumnDescriptor.DEFAULT_COMPRESSION,
+ HColumnDescriptor.DEFAULT_IN_MEMORY,
+ HColumnDescriptor.DEFAULT_BLOCKCACHE,
+ HColumnDescriptor.DEFAULT_BLOCKSIZE,
+ HColumnDescriptor.DEFAULT_TTL,
+ HColumnDescriptor.DEFAULT_BLOOMFILTER,
+ HColumnDescriptor.DEFAULT_REPLICATION_SCOPE);
+
+ HTableDescriptor desc = new HTableDescriptor(TABLENAME);
+ desc.addFamily(hcd);
+ TEST_UTIL.getHBaseAdmin().createTable(desc);
+ Configuration c = TEST_UTIL.getConfiguration();
+ HTable h = new HTable(c, TABLENAME);
+
+ long ts = System.currentTimeMillis();
+ Put p = new Put(T1, ts);
+ p.add(FAMILY, C0, T1);
+ h.put(p);
+ p = new Put(T1, ts+2);
+ p.add(FAMILY, C0, T2);
+ h.put(p);
+ p = new Put(T1, ts+4);
+ p.add(FAMILY, C0, T3);
+ h.put(p);
+
+ Delete d = new Delete(T1, ts+2, null);
+ h.delete(d);
+
+ d = new Delete(T1, ts+3, null);
+ d.deleteColumns(FAMILY, C0, ts+3);
+ h.delete(d);
+
+ Get g = new Get(T1);
+ // does *not* include the delete
+ g.setTimeRange(0, ts+3);
+ Result r = h.get(g);
+ assertArrayEquals(T2, r.getValue(FAMILY, C0));
+
+ Scan s = new Scan(T1);
+ s.setTimeRange(0, ts+3);
+ s.setMaxVersions();
+ ResultScanner scanner = h.getScanner(s);
+ KeyValue[] kvs = scanner.next().raw();
+ assertArrayEquals(T2, kvs[0].getValue());
+ assertArrayEquals(T1, kvs[1].getValue());
+ scanner.close();
+
+ s = new Scan(T1);
+ s.setRaw(true);
+ s.setMaxVersions();
+ scanner = h.getScanner(s);
+ kvs = scanner.next().raw();
+ assertTrue(kvs[0].isDeleteFamily());
+ assertArrayEquals(T3, kvs[1].getValue());
+ assertTrue(kvs[2].isDelete());
+ assertArrayEquals(T2, kvs[3].getValue());
+ assertArrayEquals(T1, kvs[4].getValue());
+ scanner.close();
+ }
+
+ /**
* HBASE-2468 use case 1 and 2: region info de/serialization
*/
@Test
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestCompaction.java Fri Oct 21 19:57:07 2011
@@ -24,13 +24,8 @@ import static org.mockito.Mockito.spy;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -240,11 +235,15 @@ public class TestCompaction extends HBas
// Multiple versions allowed for an entry, so the delete isn't enough
// Lower TTL and expire to ensure that all our entries have been wiped
- final int ttlInSeconds = 1;
+ final int ttl = 1000;
for (Store store: this.r.stores.values()) {
- store.ttl = ttlInSeconds * 1000;
+ Store.ScanInfo old = store.scanInfo;
+ Store.ScanInfo si = new Store.ScanInfo(old.getFamily(),
+ old.getMinVersions(), old.getMaxVersions(), ttl,
+ old.getKeepDeletedCells(), old.getComparator());
+ store.scanInfo = si;
}
- Thread.sleep(ttlInSeconds * 1000);
+ Thread.sleep(1000);
r.compactStores(true);
int count = count();
@@ -446,11 +445,15 @@ public class TestCompaction extends HBas
// Multiple versions allowed for an entry, so the delete isn't enough
// Lower TTL and expire to ensure that all our entries have been wiped
- final int ttlInSeconds = 1;
+ final int ttl = 1000;
for (Store store: this.r.stores.values()) {
- store.ttl = ttlInSeconds * 1000;
+ Store.ScanInfo old = store.scanInfo;
+ Store.ScanInfo si = new Store.ScanInfo(old.getFamily(),
+ old.getMinVersions(), old.getMaxVersions(), ttl,
+ old.getKeepDeletedCells(), old.getComparator());
+ store.scanInfo = si;
}
- Thread.sleep(ttlInSeconds * 1000);
+ Thread.sleep(ttl);
r.compactStores(true);
assertEquals(0, count());
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestExplicitColumnTracker.java Fri Oct 21 19:57:07 2011
@@ -27,6 +27,7 @@ import java.util.TreeSet;
import java.util.Arrays;
import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode;
import org.apache.hadoop.hbase.util.Bytes;
@@ -54,7 +55,8 @@ public class TestExplicitColumnTracker e
long timestamp = 0;
//"Match"
for(byte [] col : scannerColumns){
- result.add(exp.checkColumn(col, 0, col.length, ++timestamp));
+ result.add(exp.checkColumn(col, 0, col.length, ++timestamp,
+ KeyValue.Type.Put.getCode()));
}
assertEquals(expected.size(), result.size());
@@ -166,13 +168,13 @@ public class TestExplicitColumnTracker e
Long.MAX_VALUE);
for (int i = 0; i < 100000; i+=2) {
byte [] col = Bytes.toBytes("col"+i);
- explicit.checkColumn(col, 0, col.length, 1);
+ explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode());
}
explicit.update();
for (int i = 1; i < 100000; i+=2) {
byte [] col = Bytes.toBytes("col"+i);
- explicit.checkColumn(col, 0, col.length, 1);
+ explicit.checkColumn(col, 0, col.length, 1, KeyValue.Type.Put.getCode());
}
}
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java?rev=1187531&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestKeepDeletes.java Fri Oct 21 19:57:07 2011
@@ -0,0 +1,725 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class TestKeepDeletes extends HBaseTestCase {
+ private final byte[] T0 = Bytes.toBytes("0");
+ private final byte[] T1 = Bytes.toBytes("1");
+ private final byte[] T2 = Bytes.toBytes("2");
+ private final byte[] T3 = Bytes.toBytes("3");
+ private final byte[] T4 = Bytes.toBytes("4");
+ private final byte[] T5 = Bytes.toBytes("5");
+ private final byte[] T6 = Bytes.toBytes("6");
+
+ private final byte[] c0 = COLUMNS[0];
+ private final byte[] c1 = COLUMNS[1];
+
+ /**
+ * Make sure that deleted rows are retained.
+ * Family delete markers are deleted.
+ * Column Delete markers are versioned
+ * Time range scan of deleted rows are possible
+ */
+ public void testBasicScenario() throws Exception {
+ // keep 3 versions, rows do not expire
+ HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
+ HConstants.FOREVER, true);
+ HRegion region = createNewHRegion(htd, null, null);
+
+ long ts = System.currentTimeMillis();
+ Put p = new Put(T1, ts);
+ p.add(c0, c0, T1);
+ region.put(p);
+ p = new Put(T1, ts+1);
+ p.add(c0, c0, T2);
+ region.put(p);
+ p = new Put(T1, ts+2);
+ p.add(c0, c0, T3);
+ region.put(p);
+ p = new Put(T1, ts+4);
+ p.add(c0, c0, T4);
+ region.put(p);
+
+ // now place a delete marker at ts+2
+ Delete d = new Delete(T1, ts+2, null);
+ region.delete(d, null, true);
+
+ // a raw scan can see the delete markers
+ // (one for each column family)
+ assertEquals(3, countDeleteMarkers(region));
+
+ // get something *before* the delete marker
+ Get g = new Get(T1);
+ g.setMaxVersions();
+ g.setTimeRange(0L, ts+2);
+ Result r = region.get(g, null);
+ checkResult(r, c0, c0, T2,T1);
+
+ // flush
+ region.flushcache();
+
+ // yep, T2 still there, T1 gone
+ r = region.get(g, null);
+ checkResult(r, c0, c0, T2);
+
+ // major compact
+ region.compactStores(true);
+ region.compactStores(true);
+
+ // one delete marker left (the others did not
+ // have older puts)
+ assertEquals(1, countDeleteMarkers(region));
+
+ // still there (even after multiple compactions)
+ r = region.get(g, null);
+ checkResult(r, c0, c0, T2);
+
+ // a timerange that includes the delete marker won't see past rows
+ g.setTimeRange(0L, ts+4);
+ r = region.get(g, null);
+ assertTrue(r.isEmpty());
+
+ // two more puts, this will expire the older puts.
+ p = new Put(T1, ts+5);
+ p.add(c0, c0, T5);
+ region.put(p);
+ p = new Put(T1, ts+6);
+ p.add(c0, c0, T6);
+ region.put(p);
+
+ // also add an old put again
+ // (which is past the max versions)
+ p = new Put(T1, ts);
+ p.add(c0, c0, T1);
+ region.put(p);
+ r = region.get(g, null);
+ assertTrue(r.isEmpty());
+
+ region.flushcache();
+ region.compactStores(true);
+ region.compactStores(true);
+
+ // verify that the delete marker itself was collected
+ region.put(p);
+ r = region.get(g, null);
+ checkResult(r, c0, c0, T1);
+ assertEquals(0, countDeleteMarkers(region));
+ }
+
+ /**
+ * Even when the store does not keep deletes a "raw" scan will
+ * return everything it can find (unless discarding cells is guaranteed
+ * to have no effect).
+ * Assuming this the desired behavior. Could also disallow "raw" scanning
+ * if the store does not have KEEP_DELETED_CELLS enabled.
+ * (can be changed easily)
+ */
+ public void testRawScanWithoutKeepingDeletes() throws Exception {
+ // KEEP_DELETED_CELLS is NOT enabled
+ HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
+ HConstants.FOREVER, false);
+ HRegion region = createNewHRegion(htd, null, null);
+
+ long ts = System.currentTimeMillis();
+ Put p = new Put(T1, ts);
+ p.add(c0, c0, T1);
+ region.put(p);
+
+ Delete d = new Delete(T1, ts, null);
+ d.deleteColumn(c0, c0, ts);
+ region.delete(d, null, true);
+
+ // scan still returns delete markers and deletes rows
+ Scan s = new Scan();
+ s.setRaw(true);
+ s.setMaxVersions();
+ InternalScanner scan = region.getScanner(s);
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ scan.next(kvs);
+ assertEquals(2, kvs.size());
+
+ region.flushcache();
+ region.compactStores(true);
+
+ // after compaction they are gone
+ // (note that this a test with a Store without
+ // KEEP_DELETED_CELLS)
+ s = new Scan();
+ s.setRaw(true);
+ s.setMaxVersions();
+ scan = region.getScanner(s);
+ kvs = new ArrayList<KeyValue>();
+ scan.next(kvs);
+ assertTrue(kvs.isEmpty());
+ }
+
+ /**
+ * basic verification of existing behavior
+ */
+ public void testWithoutKeepingDeletes() throws Exception {
+ // KEEP_DELETED_CELLS is NOT enabled
+ HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
+ HConstants.FOREVER, false);
+ HRegion region = createNewHRegion(htd, null, null);
+
+ long ts = System.currentTimeMillis();
+ Put p = new Put(T1, ts);
+ p.add(c0, c0, T1);
+ region.put(p);
+ Delete d = new Delete(T1, ts+2, null);
+ d.deleteColumn(c0, c0, ts);
+ region.delete(d, null, true);
+
+ // "past" get does not see rows behind delete marker
+ Get g = new Get(T1);
+ g.setMaxVersions();
+ g.setTimeRange(0L, ts+1);
+ Result r = region.get(g, null);
+ assertTrue(r.isEmpty());
+
+ // "past" scan does not see rows behind delete marker
+ Scan s = new Scan();
+ s.setMaxVersions();
+ s.setTimeRange(0L, ts+1);
+ InternalScanner scanner = region.getScanner(s);
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ while(scanner.next(kvs));
+ assertTrue(kvs.isEmpty());
+
+ // flushing and minor compaction keep delete markers
+ region.flushcache();
+ region.compactStores();
+ assertEquals(1, countDeleteMarkers(region));
+ region.compactStores(true);
+ // major compaction deleted it
+ assertEquals(0, countDeleteMarkers(region));
+ }
+
+ /**
+ * The ExplicitColumnTracker does not support "raw" scanning.
+ */
+ public void testRawScanWithColumns() throws Exception {
+ HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
+ HConstants.FOREVER, true);
+ HRegion region = createNewHRegion(htd, null, null);
+
+ Scan s = new Scan();
+ s.setRaw(true);
+ s.setMaxVersions();
+ s.addColumn(c0, c0);
+
+ try {
+ InternalScanner scan = region.getScanner(s);
+ fail("raw scanner with columns should have failed");
+ } catch (DoNotRetryIOException dnre) {
+ // ok!
+ }
+ }
+
+ /**
+ * Verify that "raw" scanning mode return delete markers and deletes rows.
+ */
+ public void testRawScan() throws Exception {
+ HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
+ HConstants.FOREVER, true);
+ HRegion region = createNewHRegion(htd, null, null);
+
+ long ts = System.currentTimeMillis();
+ Put p = new Put(T1, ts);
+ p.add(c0, c0, T1);
+ region.put(p);
+ p = new Put(T1, ts+2);
+ p.add(c0, c0, T2);
+ region.put(p);
+ p = new Put(T1, ts+4);
+ p.add(c0, c0, T3);
+ region.put(p);
+
+ Delete d = new Delete(T1, ts+1, null);
+ region.delete(d, null, true);
+
+ d = new Delete(T1, ts+2, null);
+ d.deleteColumn(c0, c0, ts+2);
+ region.delete(d, null, true);
+
+ d = new Delete(T1, ts+3, null);
+ d.deleteColumns(c0, c0, ts+3);
+ region.delete(d, null, true);
+
+ Scan s = new Scan();
+ s.setRaw(true);
+ s.setMaxVersions();
+ InternalScanner scan = region.getScanner(s);
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ scan.next(kvs);
+ assertTrue(kvs.get(0).isDeleteFamily());
+ assertEquals(kvs.get(1).getValue(), T3);
+ assertTrue(kvs.get(2).isDelete());
+ assertTrue(kvs.get(3).isDeleteType());
+ assertEquals(kvs.get(4).getValue(), T2);
+ assertEquals(kvs.get(5).getValue(), T1);
+ }
+
+ /**
+ * Verify that delete markers are removed from an otherwise empty store.
+ */
+ public void testDeleteMarkerExpirationEmptyStore() throws Exception {
+ HTableDescriptor htd = createTableDescriptor(getName(), 0, 1,
+ HConstants.FOREVER, true);
+ HRegion region = createNewHRegion(htd, null, null);
+
+ long ts = System.currentTimeMillis();
+
+ Delete d = new Delete(T1, ts, null);
+ d.deleteColumns(c0, c0, ts);
+ region.delete(d, null, true);
+
+ d = new Delete(T1, ts, null);
+ d.deleteFamily(c0);
+ region.delete(d, null, true);
+
+ d = new Delete(T1, ts, null);
+ d.deleteColumn(c0, c0, ts+1);
+ region.delete(d, null, true);
+
+ d = new Delete(T1, ts, null);
+ d.deleteColumn(c0, c0, ts+2);
+ region.delete(d, null, true);
+
+ // 1 family marker, 1 column marker, 2 version markers
+ assertEquals(4, countDeleteMarkers(region));
+
+ // neither flush nor minor compaction removes any marker
+ region.flushcache();
+ assertEquals(4, countDeleteMarkers(region));
+ region.compactStores(false);
+ assertEquals(4, countDeleteMarkers(region));
+
+ // major compaction removes all, since there are no puts they affect
+ region.compactStores(true);
+ assertEquals(0, countDeleteMarkers(region));
+ }
+
+ /**
+ * Test delete marker removal from store files.
+ */
+ public void testDeleteMarkerExpiration() throws Exception {
+ HTableDescriptor htd = createTableDescriptor(getName(), 0, 1,
+ HConstants.FOREVER, true);
+ HRegion region = createNewHRegion(htd, null, null);
+
+ long ts = System.currentTimeMillis();
+
+ Put p = new Put(T1, ts);
+ p.add(c0, c0, T1);
+ region.put(p);
+
+ // a put into another store (CF) should have no effect
+ p = new Put(T1, ts-10);
+ p.add(c1, c0, T1);
+ region.put(p);
+
+ // all the following deletes affect the put
+ Delete d = new Delete(T1, ts, null);
+ d.deleteColumns(c0, c0, ts);
+ region.delete(d, null, true);
+
+ d = new Delete(T1, ts, null);
+ d.deleteFamily(c0, ts);
+ region.delete(d, null, true);
+
+ d = new Delete(T1, ts, null);
+ d.deleteColumn(c0, c0, ts+1);
+ region.delete(d, null, true);
+
+ d = new Delete(T1, ts, null);
+ d.deleteColumn(c0, c0, ts+2);
+ region.delete(d, null, true);
+
+ // 1 family marker, 1 column marker, 2 version markers
+ assertEquals(4, countDeleteMarkers(region));
+
+ region.flushcache();
+ assertEquals(4, countDeleteMarkers(region));
+ region.compactStores(false);
+ assertEquals(4, countDeleteMarkers(region));
+
+ // another put will push out the earlier put...
+ p = new Put(T1, ts+3);
+ p.add(c0, c0, T1);
+ region.put(p);
+
+ region.flushcache();
+ // no markers are collected, since there is an affected put
+ region.compactStores(true);
+ assertEquals(4, countDeleteMarkers(region));
+
+ // the last collections collected the earlier put
+ // so after this collection all markers
+ region.compactStores(true);
+ assertEquals(0, countDeleteMarkers(region));
+ }
+
+ /**
+ * Verify correct range demarcation
+ */
+ public void testRanges() throws Exception {
+ HTableDescriptor htd = createTableDescriptor(getName(), 0, 3,
+ HConstants.FOREVER, true);
+ HRegion region = createNewHRegion(htd, null, null);
+
+ long ts = System.currentTimeMillis();
+ Put p = new Put(T1, ts);
+ p.add(c0, c0, T1);
+ p.add(c0, c1, T1);
+ p.add(c1, c0, T1);
+ p.add(c1, c1, T1);
+ region.put(p);
+
+ p = new Put(T2, ts);
+ p.add(c0, c0, T1);
+ p.add(c0, c1, T1);
+ p.add(c1, c0, T1);
+ p.add(c1, c1, T1);
+ region.put(p);
+
+ p = new Put(T1, ts+1);
+ p.add(c0, c0, T2);
+ p.add(c0, c1, T2);
+ p.add(c1, c0, T2);
+ p.add(c1, c1, T2);
+ region.put(p);
+
+ p = new Put(T2, ts+1);
+ p.add(c0, c0, T2);
+ p.add(c0, c1, T2);
+ p.add(c1, c0, T2);
+ p.add(c1, c1, T2);
+ region.put(p);
+
+ Delete d = new Delete(T1, ts+1, null);
+ d.deleteColumns(c0, c0, ts+1);
+ region.delete(d, null, true);
+
+ d = new Delete(T1, ts+1, null);
+ d.deleteFamily(c1, ts+1);
+ region.delete(d, null, true);
+
+ d = new Delete(T2, ts+1, null);
+ d.deleteFamily(c0, ts+1);
+ region.delete(d, null, true);
+
+ // add an older delete, to make sure it is filtered
+ d = new Delete(T1, ts-10, null);
+ d.deleteFamily(c1, ts-10);
+ region.delete(d, null, true);
+
+ // ts + 2 does NOT include the delete at ts+1
+ checkGet(region, T1, c0, c0, ts+2, T2, T1);
+ checkGet(region, T1, c0, c1, ts+2, T2, T1);
+ checkGet(region, T1, c1, c0, ts+2, T2, T1);
+ checkGet(region, T1, c1, c1, ts+2, T2, T1);
+
+ checkGet(region, T2, c0, c0, ts+2, T2, T1);
+ checkGet(region, T2, c0, c1, ts+2, T2, T1);
+ checkGet(region, T2, c1, c0, ts+2, T2, T1);
+ checkGet(region, T2, c1, c1, ts+2, T2, T1);
+
+ // ts + 3 does
+ checkGet(region, T1, c0, c0, ts+3);
+ checkGet(region, T1, c0, c1, ts+3, T2, T1);
+ checkGet(region, T1, c1, c0, ts+3);
+ checkGet(region, T1, c1, c1, ts+3);
+
+ checkGet(region, T2, c0, c0, ts+3);
+ checkGet(region, T2, c0, c1, ts+3);
+ checkGet(region, T2, c1, c0, ts+3, T2, T1);
+ checkGet(region, T2, c1, c1, ts+3, T2, T1);
+ }
+
+ /**
+ * Verify that column/version delete makers are sorted
+ * with their respective puts and removed correctly by
+ * versioning (i.e. not relying on the store earliestPutTS).
+ */
+ public void testDeleteMarkerVersioning() throws Exception {
+ HTableDescriptor htd = createTableDescriptor(getName(), 0, 1,
+ HConstants.FOREVER, true);
+ HRegion region = createNewHRegion(htd, null, null);
+
+ long ts = System.currentTimeMillis();
+ Put p = new Put(T1, ts);
+ p.add(c0, c0, T1);
+ region.put(p);
+
+ // this prevents marker collection based on earliestPut
+ // (cannot keep earliest put per column in the store file)
+ p = new Put(T1, ts-10);
+ p.add(c0, c1, T1);
+ region.put(p);
+
+ Delete d = new Delete(T1, ts, null);
+ // test corner case (Put and Delete have same TS)
+ d.deleteColumns(c0, c0, ts);
+ region.delete(d, null, true);
+
+ d = new Delete(T1, ts+1, null);
+ d.deleteColumn(c0, c0, ts+1);
+ region.delete(d, null, true);
+
+ d = new Delete(T1, ts+3, null);
+ d.deleteColumn(c0, c0, ts+3);
+ region.delete(d, null, true);
+
+ region.flushcache();
+ region.compactStores(true);
+ region.compactStores(true);
+ assertEquals(3, countDeleteMarkers(region));
+
+ // add two more puts, since max version is 1
+ // the 2nd put (and all delete markers following)
+ // will be removed.
+ p = new Put(T1, ts+2);
+ p.add(c0, c0, T2);
+ region.put(p);
+
+ // delete, put, delete, delete, put
+ assertEquals(3, countDeleteMarkers(region));
+
+ p = new Put(T1, ts+3);
+ p.add(c0, c0, T3);
+ region.put(p);
+
+ // This is potentially questionable behavior.
+ // This could be changed by not letting the ScanQueryMatcher
+ // return SEEK_NEXT_COL if a put is past VERSIONS, but instead
+ // return SKIP if the store has KEEP_DELETED_CELLS set.
+ //
+ // As it stands, the 1 here is correct here.
+ // There are two puts, VERSIONS is one, so after the 1st put the scanner
+ // knows that there can be no more KVs (put or delete) that have any effect.
+ //
+ // delete, put, put | delete, delete
+ assertEquals(1, countDeleteMarkers(region));
+
+ // flush cache only sees what is in the memstore
+ region.flushcache();
+
+ // Here we have the three markers again, because the flush above
+ // removed the 2nd put before the file is written.
+ // So there's only one put, and hence the deletes already in the store
+ // files cannot be removed safely.
+ // delete, put, delete, delete
+ assertEquals(3, countDeleteMarkers(region));
+
+ region.compactStores(true);
+ assertEquals(3, countDeleteMarkers(region));
+
+ // add one more put
+ p = new Put(T1, ts+4);
+ p.add(c0, c0, T4);
+ region.put(p);
+
+ region.flushcache();
+ // one trailing delete marker remains (but only one)
+ // because delete markers do not increase the version count
+ assertEquals(1, countDeleteMarkers(region));
+ region.compactStores(true);
+ region.compactStores(true);
+ assertEquals(1, countDeleteMarkers(region));
+ }
+
+ /**
+ * Verify scenarios with multiple CFs and columns
+ */
+ public void testWithMixedCFs() throws Exception {
+ HTableDescriptor htd = createTableDescriptor(getName(), 0, 1,
+ HConstants.FOREVER, true);
+ HRegion region = createNewHRegion(htd, null, null);
+
+ long ts = System.currentTimeMillis();
+
+ Put p = new Put(T1, ts);
+ p.add(c0, c0, T1);
+ p.add(c0, c1, T1);
+ p.add(c1, c0, T1);
+ p.add(c1, c1, T1);
+ region.put(p);
+
+ p = new Put(T2, ts+1);
+ p.add(c0, c0, T2);
+ p.add(c0, c1, T2);
+ p.add(c1, c0, T2);
+ p.add(c1, c1, T2);
+ region.put(p);
+
+ // family markers are each family
+ Delete d = new Delete(T1, ts, null);
+ region.delete(d, null, true);
+
+ d = new Delete(T2, ts+1, null);
+ region.delete(d, null, true);
+
+ Scan s = new Scan(T1);
+ s.setTimeRange(0, ts+1);
+ InternalScanner scanner = region.getScanner(s);
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ scanner.next(kvs);
+ assertEquals(4, kvs.size());
+ scanner.close();
+
+ s = new Scan(T2);
+ s.setTimeRange(0, ts+2);
+ scanner = region.getScanner(s);
+ kvs = new ArrayList<KeyValue>();
+ scanner.next(kvs);
+ assertEquals(4, kvs.size());
+ scanner.close();
+ }
+
+ /**
+ * Test keeping deleted rows together with min versions set
+ * @throws Exception
+ */
+ public void testWithMinVersions() throws Exception {
+ HTableDescriptor htd = createTableDescriptor(getName(), 3, 1000, 1, true);
+ HRegion region = createNewHRegion(htd, null, null);
+
+ long ts = System.currentTimeMillis() - 2000; // 2s in the past
+
+ Put p = new Put(T1, ts);
+ p.add(c0, c0, T3);
+ region.put(p);
+ p = new Put(T1, ts-1);
+ p.add(c0, c0, T2);
+ region.put(p);
+ p = new Put(T1, ts-3);
+ p.add(c0, c0, T1);
+ region.put(p);
+ p = new Put(T1, ts-4);
+ p.add(c0, c0, T0);
+ region.put(p);
+
+ // all puts now are just retained because of min versions = 3
+
+ // place a family delete marker
+ Delete d = new Delete(T1, ts-1, null);
+ region.delete(d, null, true);
+ // and a column delete marker
+ d = new Delete(T1, ts-2, null);
+ d.deleteColumns(c0, c0, ts-1);
+ region.delete(d, null, true);
+
+ Get g = new Get(T1);
+ g.setMaxVersions();
+ g.setTimeRange(0L, ts-2);
+ Result r = region.get(g, null);
+ checkResult(r, c0, c0, T1,T0);
+
+ // 3 families, one column delete marker
+ assertEquals(4, countDeleteMarkers(region));
+
+ region.flushcache();
+ // no delete marker removes by the flush
+ assertEquals(4, countDeleteMarkers(region));
+
+ r = region.get(g, null);
+ checkResult(r, c0, c0, T1);
+ p = new Put(T1, ts+1);
+ p.add(c0, c0, T4);
+ region.put(p);
+ region.flushcache();
+
+ assertEquals(4, countDeleteMarkers(region));
+
+ r = region.get(g, null);
+ checkResult(r, c0, c0, T1);
+
+ // this will push out the last put before
+ // family delete marker
+ p = new Put(T1, ts+2);
+ p.add(c0, c0, T5);
+ region.put(p);
+
+ region.flushcache();
+ region.compactStores(true);
+ // the two family markers without puts are gone
+ assertEquals(2, countDeleteMarkers(region));
+
+ // the last compactStores updated the earliestPutTs,
+ // so after the next compaction the last family delete marker is also gone
+ region.compactStores(true);
+ assertEquals(0, countDeleteMarkers(region));
+ }
+
+ private void checkGet(HRegion region, byte[] row, byte[] fam, byte[] col,
+ long time, byte[]... vals) throws IOException {
+ Get g = new Get(row);
+ g.addColumn(fam, col);
+ g.setMaxVersions();
+ g.setTimeRange(0L, time);
+ Result r = region.get(g, null);
+ checkResult(r, fam, col, vals);
+
+ }
+
+ private int countDeleteMarkers(HRegion region) throws IOException {
+ Scan s = new Scan();
+ s.setRaw(true);
+ s.setMaxVersions();
+ InternalScanner scan = region.getScanner(s);
+ List<KeyValue> kvs = new ArrayList<KeyValue>();
+ int res = 0;
+ boolean hasMore;
+ do {
+ hasMore = scan.next(kvs);
+ for (KeyValue kv : kvs) {
+ if(kv.isDelete()) res++;
+ }
+ kvs.clear();
+ } while (hasMore);
+ scan.close();
+ return res;
+ }
+
+ private void checkResult(Result r, byte[] fam, byte[] col, byte[] ... vals) {
+ assertEquals(r.size(), vals.length);
+ List<KeyValue> kvs = r.getColumn(fam, col);
+ assertEquals(kvs.size(), vals.length);
+ for (int i=0;i<vals.length;i++) {
+ assertEquals(kvs.get(i).getValue(), vals[i]);
+ }
+ }
+
+}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java?rev=1187531&r1=1187530&r2=1187531&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/regionserver/TestMemStore.java Fri Oct 21 19:57:07 2011
@@ -26,8 +26,6 @@ import java.rmi.UnexpectedException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.NavigableSet;
-import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
@@ -39,8 +37,9 @@ import org.apache.hadoop.hbase.HBaseConf
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValueTestUtil;
-import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.regionserver.Store.ScanInfo;
+import org.apache.hadoop.hbase.regionserver.StoreScanner.ScanType;
import org.apache.hadoop.hbase.util.Bytes;
import com.google.common.base.Joiner;
@@ -89,8 +88,10 @@ public class TestMemStore extends TestCa
Scan scan = new Scan();
List<KeyValue> result = new ArrayList<KeyValue>();
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
- StoreScanner s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
- this.memstore.comparator, null, memstorescanners);
+ ScanInfo scanInfo = new ScanInfo(null, 0, 1, HConstants.LATEST_TIMESTAMP, false,
+ this.memstore.comparator);
+ ScanType scanType = ScanType.USER_SCAN;
+ StoreScanner s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
int count = 0;
try {
while (s.next(result)) {
@@ -111,8 +112,7 @@ public class TestMemStore extends TestCa
ReadWriteConsistencyControl.resetThreadReadPoint(rwcc);
memstorescanners = this.memstore.getScanners();
// Now assert can count same number even if a snapshot mid-scan.
- s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
- this.memstore.comparator, null, memstorescanners);
+ s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
count = 0;
try {
while (s.next(result)) {
@@ -138,8 +138,7 @@ public class TestMemStore extends TestCa
memstorescanners = this.memstore.getScanners();
// Assert that new values are seen in kvset as we scan.
long ts = System.currentTimeMillis();
- s = new StoreScanner(scan, null, HConstants.LATEST_TIMESTAMP,
- this.memstore.comparator, null, memstorescanners);
+ s = new StoreScanner(scan, scanInfo, scanType, null, memstorescanners);
count = 0;
int snapshotIndex = 5;
try {
@@ -553,10 +552,12 @@ public class TestMemStore extends TestCa
}
//starting from each row, validate results should contain the starting row
for (int startRowId = 0; startRowId < ROW_COUNT; startRowId++) {
- InternalScanner scanner =
- new StoreScanner(new Scan(Bytes.toBytes(startRowId)), FAMILY,
- Integer.MAX_VALUE, this.memstore.comparator, null,
- memstore.getScanners());
+ ScanInfo scanInfo = new ScanInfo(FAMILY, 0, 1, Integer.MAX_VALUE, false,
+ this.memstore.comparator);
+ ScanType scanType = ScanType.USER_SCAN;
+ InternalScanner scanner = new StoreScanner(new Scan(
+ Bytes.toBytes(startRowId)), scanInfo, scanType, null,
+ memstore.getScanners());
List<KeyValue> results = new ArrayList<KeyValue>();
for (int i = 0; scanner.next(results); i++) {
int rowId = startRowId + i;