You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2013/03/13 19:38:31 UTC
svn commit: r1456082 - in /hbase/trunk/hbase-server/src:
main/java/org/apache/hadoop/hbase/regionserver/
test/java/org/apache/hadoop/hbase/regionserver/
Author: sershe
Date: Wed Mar 13 18:38:31 2013
New Revision: 1456082
URL: http://svn.apache.org/r1456082
Log:
HBASE-8056 allow StoreScanner to drop deletes from some part of the compaction range
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java?rev=1456082&r1=1456081&r2=1456082&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScanQueryMatcher.java Wed Mar 13 18:38:31 2013
@@ -33,6 +33,8 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import com.google.common.base.Preconditions;
+
/**
* A query matcher that is specifically designed for the scan case.
*/
@@ -64,7 +66,8 @@ public class ScanQueryMatcher {
* marker to reach deleted rows.
*/
/** whether to retain delete markers */
- private final boolean retainDeletesInOutput;
+ private boolean retainDeletesInOutput;
+
/** whether to return deleted rows */
private final boolean keepDeletedCells;
/** whether time range queries can see rows "behind" a delete */
@@ -97,6 +100,8 @@ public class ScanQueryMatcher {
/** readPoint over which the KVs are unconditionally included */
protected long maxReadPointToTrackVersions;
+ private byte[] dropDeletesFromRow = null, dropDeletesToRow = null;
+
/**
* This variable shows whether there is an null column in the query. There
* always exists a null column in the wildcard column query.
@@ -179,6 +184,27 @@ public class ScanQueryMatcher {
}
}
+ /**
+ * Construct a QueryMatcher for a scan that drop deletes from a limited range of rows.
+ * @param scan
+ * @param scanInfo The store's immutable scan info
+ * @param columns
+ * @param earliestPutTs Earliest put seen in any of the store files.
+ * @param oldestUnexpiredTS the oldest timestamp we are interested in,
+ * based on TTL
+ * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
+ * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
+ */
+ public ScanQueryMatcher(Scan scan, ScanInfo scanInfo, NavigableSet<byte[]> columns,
+ long readPointToUse, long earliestPutTs, long oldestUnexpiredTS,
+ byte[] dropDeletesFromRow, byte[] dropDeletesToRow) {
+ this(scan, scanInfo, columns, ScanType.COMPACT_RETAIN_DELETES, readPointToUse, earliestPutTs,
+ oldestUnexpiredTS);
+ Preconditions.checkArgument((dropDeletesFromRow != null) && (dropDeletesToRow != null));
+ this.dropDeletesFromRow = dropDeletesFromRow;
+ this.dropDeletesToRow = dropDeletesToRow;
+ }
+
/*
* Constructor for tests
*/
@@ -372,6 +398,33 @@ public class ScanQueryMatcher {
}
+ /** Handle partial-drop-deletes. As we match keys in order, when we have a range from which
+ * we can drop deletes, we can set retainDeletesInOutput to false for the duration of this
+ * range only, and maintain consistency. */
+ private void checkPartialDropDeleteRange(byte [] row, int offset, short length) {
+ // If partial-drop-deletes are used, initially, dropDeletesFromRow and dropDeletesToRow
+ // are both set, and the matcher is set to retain deletes. We assume ordered keys. When
+ // dropDeletesFromRow is leq current kv, we start dropping deletes and reset
+ // dropDeletesFromRow; thus the 2nd "if" starts to apply.
+ if ((dropDeletesFromRow != null)
+ && ((dropDeletesFromRow == HConstants.EMPTY_START_ROW)
+ || (Bytes.compareTo(row, offset, length,
+ dropDeletesFromRow, 0, dropDeletesFromRow.length) >= 0))) {
+ retainDeletesInOutput = false;
+ dropDeletesFromRow = null;
+ }
+ // If dropDeletesFromRow is null and dropDeletesToRow is set, we are inside the partial-
+ // drop-deletes range. When dropDeletesToRow is leq current kv, we stop dropping deletes,
+ // and reset dropDeletesToRow so that we don't do any more compares.
+ if ((dropDeletesFromRow == null)
+ && (dropDeletesToRow != null) && (dropDeletesToRow != HConstants.EMPTY_END_ROW)
+ && (Bytes.compareTo(row, offset, length,
+ dropDeletesToRow, 0, dropDeletesToRow.length) >= 0)) {
+ retainDeletesInOutput = true;
+ dropDeletesToRow = null;
+ }
+ }
+
public boolean moreRowsMayExistAfter(KeyValue kv) {
if (!Bytes.equals(stopRow , HConstants.EMPTY_END_ROW) &&
rowComparator.compareRows(kv.getBuffer(),kv.getRowOffset(),
@@ -389,6 +442,7 @@ public class ScanQueryMatcher {
* @param row
*/
public void setRow(byte [] row, int offset, short length) {
+ checkPartialDropDeleteRange(row, offset, length);
this.row = row;
this.rowOffset = offset;
this.rowLength = length;
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java?rev=1456082&r1=1456081&r2=1456082&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java Wed Mar 13 18:38:31 2013
@@ -171,7 +171,7 @@ public class StoreScanner extends NonLaz
}
/**
- * Used for major compactions.<p>
+ * Used for compactions.<p>
*
* Opens a scanner across specified StoreFiles.
* @param store who we scan
@@ -183,10 +183,39 @@ public class StoreScanner extends NonLaz
public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
List<? extends KeyValueScanner> scanners, ScanType scanType,
long smallestReadPoint, long earliestPutTs) throws IOException {
+ this(store, scanInfo, scan, scanners, scanType, smallestReadPoint, earliestPutTs, null, null);
+ }
+
+ /**
+ * Used for compactions that drop deletes from a limited range of rows.<p>
+ *
+ * Opens a scanner across specified StoreFiles.
+ * @param store who we scan
+ * @param scan the spec
+ * @param scanners ancillary scanners
+ * @param smallestReadPoint the readPoint that we should use for tracking versions
+ * @param dropDeletesFromRow The inclusive left bound of the range; can be EMPTY_START_ROW.
+ * @param dropDeletesToRow The exclusive right bound of the range; can be EMPTY_END_ROW.
+ */
+ public StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+ List<? extends KeyValueScanner> scanners, long smallestReadPoint, long earliestPutTs,
+ byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
+ this(store, scanInfo, scan, scanners, ScanType.COMPACT_RETAIN_DELETES, smallestReadPoint,
+ earliestPutTs, dropDeletesFromRow, dropDeletesToRow);
+ }
+
+ private StoreScanner(Store store, ScanInfo scanInfo, Scan scan,
+ List<? extends KeyValueScanner> scanners, ScanType scanType, long smallestReadPoint,
+ long earliestPutTs, byte[] dropDeletesFromRow, byte[] dropDeletesToRow) throws IOException {
this(store, false, scan, null, scanInfo.getTtl(),
scanInfo.getMinVersions());
- matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType,
- smallestReadPoint, earliestPutTs, oldestUnexpiredTS);
+ if (dropDeletesFromRow == null) {
+ matcher = new ScanQueryMatcher(scan, scanInfo, null, scanType,
+ smallestReadPoint, earliestPutTs, oldestUnexpiredTS);
+ } else {
+ matcher = new ScanQueryMatcher(scan, scanInfo, null, smallestReadPoint,
+ earliestPutTs, oldestUnexpiredTS, dropDeletesFromRow, dropDeletesToRow);
+ }
// Filter the list of scanners using Bloom filters, time range, TTL, etc.
scanners = selectScannersFrom(scanners);
@@ -380,6 +409,7 @@ public class StoreScanner extends NonLaz
assert prevKV == null || comparator == null || comparator.compare(prevKV, kv) <= 0 :
"Key " + prevKV + " followed by a " + "smaller key " + kv + " in cf " + store;
prevKV = kv;
+
ScanQueryMatcher.MatchCode qcode = matcher.match(kv);
switch(qcode) {
case INCLUDE:
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java?rev=1456082&r1=1456081&r2=1456082&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestQueryMatcher.java Wed Mar 13 18:38:31 2013
@@ -22,10 +22,15 @@ package org.apache.hadoop.hbase.regionse
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import java.util.NavigableSet;
+
+import static org.apache.hadoop.hbase.regionserver.ScanQueryMatcher.MatchCode.*;
import org.apache.hadoop.hbase.HBaseTestCase;
+import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.KeyValue.KVComparator;
+import org.apache.hadoop.hbase.KeyValue.Type;
import org.apache.hadoop.hbase.SmallTests;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Scan;
@@ -40,6 +45,7 @@ public class TestQueryMatcher extends HB
private byte[] row1;
private byte[] row2;
+ private byte[] row3;
private byte[] fam1;
private byte[] fam2;
private byte[] col1;
@@ -60,6 +66,7 @@ public class TestQueryMatcher extends HB
super.setUp();
row1 = Bytes.toBytes("row1");
row2 = Bytes.toBytes("row2");
+ row3 = Bytes.toBytes("row3");
fam1 = Bytes.toBytes("fam1");
fam2 = Bytes.toBytes("fam2");
col1 = Bytes.toBytes("col1");
@@ -283,5 +290,56 @@ public class TestQueryMatcher extends HB
}
}
+ public void testMatch_PartialRangeDropDeletes() throws Exception {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ ScanInfo scanInfo = new ScanInfo(fam2, 0, 1, ttl, false, 0, rowComparator);
+ NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
+
+ // Some ranges.
+ testDropDeletes(
+ row2, row3, new byte[][] { row1, row2, row2, row3 }, INCLUDE, SKIP, SKIP, INCLUDE);
+ testDropDeletes(row2, row3, new byte[][] { row1, row1, row2 }, INCLUDE, INCLUDE, SKIP);
+ testDropDeletes(row2, row3, new byte[][] { row2, row3, row3 }, SKIP, INCLUDE, INCLUDE);
+ testDropDeletes(row1, row3, new byte[][] { row1, row2, row3 }, SKIP, SKIP, INCLUDE);
+ // Open ranges.
+ testDropDeletes(HConstants.EMPTY_START_ROW, row3,
+ new byte[][] { row1, row2, row3 }, SKIP, SKIP, INCLUDE);
+ testDropDeletes(row2, HConstants.EMPTY_END_ROW,
+ new byte[][] { row1, row2, row3 }, INCLUDE, SKIP, SKIP);
+ testDropDeletes(HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW,
+ new byte[][] { row1, row2, row3, row3 }, SKIP, SKIP, SKIP, SKIP);
+
+ // No KVs in range.
+ testDropDeletes(row2, row3, new byte[][] { row1, row1, row3 }, INCLUDE, INCLUDE, INCLUDE);
+ testDropDeletes(row2, row3, new byte[][] { row3, row3 }, INCLUDE, INCLUDE);
+ testDropDeletes(row2, row3, new byte[][] { row1, row1 }, INCLUDE, INCLUDE);
+ }
+
+ private void testDropDeletes(
+ byte[] from, byte[] to, byte[][] rows, MatchCode... expected) throws IOException {
+ long now = EnvironmentEdgeManager.currentTimeMillis();
+ // Set time to purge deletes to negative value to avoid it ever happening.
+ ScanInfo scanInfo = new ScanInfo(fam2, 0, 1, ttl, false, -1L, rowComparator);
+ NavigableSet<byte[]> cols = get.getFamilyMap().get(fam2);
+
+ ScanQueryMatcher qm = new ScanQueryMatcher(scan, scanInfo, cols, Long.MAX_VALUE,
+ HConstants.OLDEST_TIMESTAMP, HConstants.OLDEST_TIMESTAMP, from, to);
+ List<ScanQueryMatcher.MatchCode> actual =
+ new ArrayList<ScanQueryMatcher.MatchCode>(rows.length);
+ byte[] prevRow = null;
+ for (byte[] row : rows) {
+ if (prevRow == null || !Bytes.equals(prevRow, row)) {
+ qm.setRow(row, 0, (short)row.length);
+ prevRow = row;
+ }
+ actual.add(qm.match(new KeyValue(row, fam2, null, now, Type.Delete)));
+ }
+
+ assertEquals(expected.length, actual.size());
+ for (int i = 0; i < expected.length; i++) {
+ if (PRINT) System.out.println("expected " + expected[i] + ", actual " + actual.get(i));
+ assertEquals(expected[i], actual.get(i));
+ }
+ }
}