You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2017/08/11 03:03:25 UTC
hbase git commit: HBASE-17125 Inconsistent result when use filter to
read data
Repository: hbase
Updated Branches:
refs/heads/master c37432fef -> 4dd24c52b
HBASE-17125 Inconsistent result when use filter to read data
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/4dd24c52
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/4dd24c52
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/4dd24c52
Branch: refs/heads/master
Commit: 4dd24c52b84c74a477e00ab6177d081c29462dd8
Parents: c37432f
Author: Guanghao Zhang <zg...@apache.org>
Authored: Thu Aug 10 21:03:50 2017 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Fri Aug 11 10:58:00 2017 +0800
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/client/Get.java | 34 +++-
.../org/apache/hadoop/hbase/client/Query.java | 6 +-
.../org/apache/hadoop/hbase/client/Scan.java | 29 ++-
.../querymatcher/ScanQueryMatcher.java | 7 +-
.../querymatcher/ScanWildcardColumnTracker.java | 7 +-
.../querymatcher/UserScanQueryMatcher.java | 166 +++++++++++-----
.../hadoop/hbase/client/TestFromClientSide.java | 195 +++++++++++++++++++
.../hadoop/hbase/regionserver/TestHRegion.java | 64 ++++++
.../hbase/regionserver/TestMinVersions.java | 8 +-
.../hadoop/hbase/regionserver/TestStore.java | 19 +-
10 files changed, 463 insertions(+), 72 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dd24c52/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
index b774a9a..086a0b4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Get.java
@@ -267,10 +267,12 @@ public class Get extends Query
/**
* Get all available versions.
* @return this for invocation chaining
+ * @deprecated It is easy to misunderstand with column family's max versions, so use
+ * {@link #readAllVersions()} instead.
*/
+ @Deprecated
public Get setMaxVersions() {
- this.maxVersions = Integer.MAX_VALUE;
- return this;
+ return readAllVersions();
}
/**
@@ -278,12 +280,34 @@ public class Get extends Query
* @param maxVersions maximum versions for each column
* @throws IOException if invalid number of versions
* @return this for invocation chaining
+ * @deprecated It is easy to misunderstand with column family's max versions, so use
+ * {@link #readVersions(int)} instead.
*/
+ @Deprecated
public Get setMaxVersions(int maxVersions) throws IOException {
- if(maxVersions <= 0) {
- throw new IOException("maxVersions must be positive");
+ return readVersions(maxVersions);
+ }
+
+ /**
+ * Get all available versions.
+ * @return this for invocation chaining
+ */
+ public Get readAllVersions() {
+ this.maxVersions = Integer.MAX_VALUE;
+ return this;
+ }
+
+ /**
+ * Get up to the specified number of versions of each column.
+ * @param versions specified number of versions for each column
+ * @throws IOException if invalid number of versions
+ * @return this for invocation chaining
+ */
+ public Get readVersions(int versions) throws IOException {
+ if (versions <= 0) {
+ throw new IOException("versions must be positive");
}
- this.maxVersions = maxVersions;
+ this.maxVersions = versions;
return this;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dd24c52/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java
index 0bf54ae..cc9e9d4 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Query.java
@@ -53,9 +53,9 @@ public abstract class Query extends OperationWithAttributes {
}
/**
- * Apply the specified server-side filter when performing the Query.
- * Only {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)} is called AFTER all tests
- * for ttl, column match, deletes and max versions have been run.
+ * Apply the specified server-side filter when performing the Query. Only
+ * {@link Filter#filterKeyValue(org.apache.hadoop.hbase.Cell)} is called AFTER all tests for ttl,
+ * column match, deletes and column family's max versions have been run.
* @param filter filter to run on the server
* @return this for invocation chaining
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dd24c52/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
index e84716f..5b75151 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Scan.java
@@ -593,19 +593,42 @@ public class Scan extends Query {
/**
* Get all available versions.
* @return this
+ * @deprecated It is easy to misunderstand with column family's max versions, so use
+ * {@link #readAllVersions()} instead.
*/
+ @Deprecated
public Scan setMaxVersions() {
- this.maxVersions = Integer.MAX_VALUE;
- return this;
+ return readAllVersions();
}
/**
* Get up to the specified number of versions of each column.
* @param maxVersions maximum versions for each column
* @return this
+ * @deprecated It is easy to misunderstand with column family's max versions, so use
+ * {@link #readVersions(int)} instead.
*/
+ @Deprecated
public Scan setMaxVersions(int maxVersions) {
- this.maxVersions = maxVersions;
+ return readVersions(maxVersions);
+ }
+
+ /**
+ * Get all available versions.
+ * @return this
+ */
+ public Scan readAllVersions() {
+ this.maxVersions = Integer.MAX_VALUE;
+ return this;
+ }
+
+ /**
+ * Get up to the specified number of versions of each column.
+ * @param versions specified number of versions for each column
+ * @return this
+ */
+ public Scan readVersions(int versions) {
+ this.maxVersions = versions;
return this;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dd24c52/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java
index 8bdab08..524d3f7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanQueryMatcher.java
@@ -355,13 +355,16 @@ public abstract class ScanQueryMatcher implements ShipperListener {
NavigableSet<byte[]> columns, ScanInfo scanInfo, long oldestUnexpiredTS, Scan userScan)
throws IOException {
int resultMaxVersion = scanInfo.getMaxVersions();
+ int maxVersionToCheck = resultMaxVersion;
if (userScan != null) {
if (userScan.isRaw()) {
resultMaxVersion = userScan.getMaxVersions();
} else {
resultMaxVersion = Math.min(userScan.getMaxVersions(), scanInfo.getMaxVersions());
}
+ maxVersionToCheck = userScan.hasFilter() ? scanInfo.getMaxVersions() : resultMaxVersion;
}
+
DeleteTracker deleteTracker;
if (scanInfo.isNewVersionBehavior() && (userScan == null || !userScan.isRaw())) {
deleteTracker = new NewVersionBehaviorTracker(columns, scanInfo.getMinVersions(),
@@ -382,11 +385,11 @@ public abstract class ScanQueryMatcher implements ShipperListener {
if (deleteTracker instanceof NewVersionBehaviorTracker) {
columnTracker = (NewVersionBehaviorTracker) deleteTracker;
} else if (columns == null || columns.size() == 0) {
- columnTracker = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), resultMaxVersion,
+ columnTracker = new ScanWildcardColumnTracker(scanInfo.getMinVersions(), maxVersionToCheck,
oldestUnexpiredTS);
} else {
columnTracker = new ExplicitColumnTracker(columns, scanInfo.getMinVersions(),
- resultMaxVersion, oldestUnexpiredTS);
+ maxVersionToCheck, oldestUnexpiredTS);
}
return new Pair<>(deleteTracker, columnTracker);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dd24c52/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java
index a73cc0b..9f0a461 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/ScanWildcardColumnTracker.java
@@ -36,8 +36,9 @@ import org.apache.hadoop.hbase.util.Bytes;
public class ScanWildcardColumnTracker implements ColumnTracker {
private Cell columnCell = null;
private int currentCount = 0;
- private int maxVersions;
- private int minVersions;
+ private final int maxVersions;
+ private final int minVersions;
+
/*
* Keeps track of the latest timestamp and type included for current column. Used to eliminate
* duplicates.
@@ -74,7 +75,6 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
@Override
public ScanQueryMatcher.MatchCode checkVersions(Cell cell, long timestamp, byte type,
boolean ignoreCount) throws IOException {
-
if (columnCell == null) {
// first iteration.
resetCell(cell);
@@ -143,7 +143,6 @@ public class ScanWildcardColumnTracker implements ColumnTracker {
} else {
return MatchCode.SEEK_NEXT_COL;
}
-
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dd24c52/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java
index 250a4a3..1debb5e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/querymatcher/UserScanQueryMatcher.java
@@ -22,6 +22,7 @@ import java.util.NavigableSet;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
@@ -52,6 +53,12 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher {
protected final TimeRange tr;
+ private final int versionsAfterFilter;
+
+ private int count = 0;
+
+ private Cell curColCell = null;
+
private static Cell createStartKey(Scan scan, ScanInfo scanInfo) {
if (scan.includeStartRow()) {
return createStartKeyFromRow(scan.getStartRow(), scanInfo);
@@ -65,6 +72,13 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher {
super(createStartKey(scan, scanInfo), scanInfo, columns, oldestUnexpiredTS, now);
this.hasNullColumn = hasNullColumn;
this.filter = scan.getFilter();
+ if (this.filter != null) {
+ this.versionsAfterFilter =
+ scan.isRaw() ? scan.getMaxVersions() : Math.min(scan.getMaxVersions(),
+ scanInfo.getMaxVersions());
+ } else {
+ this.versionsAfterFilter = 0;
+ }
this.stopRow = scan.getStopRow();
TimeRange timeRange = scan.getColumnFamilyTimeRange().get(scanInfo.getFamily());
if (timeRange == null) {
@@ -98,6 +112,14 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher {
}
}
+ @Override
+ public void beforeShipped() throws IOException {
+ super.beforeShipped();
+ if (curColCell != null) {
+ this.curColCell = KeyValueUtil.toNewKeyCell(this.curColCell);
+ }
+ }
+
protected final MatchCode matchColumn(Cell cell, long timestamp, byte typeByte)
throws IOException {
int tsCmp = tr.compare(timestamp);
@@ -108,57 +130,111 @@ public abstract class UserScanQueryMatcher extends ScanQueryMatcher {
return columns.getNextRowOrNextColumn(cell);
}
// STEP 1: Check if the column is part of the requested columns
- MatchCode colChecker = columns.checkColumn(cell, typeByte);
- if (colChecker != MatchCode.INCLUDE) {
- return colChecker;
+ MatchCode matchCode = columns.checkColumn(cell, typeByte);
+ if (matchCode != MatchCode.INCLUDE) {
+ return matchCode;
+ }
+ /*
+ * STEP 2: check the number of versions needed. This method call returns SKIP, SEEK_NEXT_COL,
+ * INCLUDE, INCLUDE_AND_SEEK_NEXT_COL, or INCLUDE_AND_SEEK_NEXT_ROW.
+ */
+ matchCode = columns.checkVersions(cell, timestamp, typeByte, false);
+ switch (matchCode) {
+ case SKIP:
+ return MatchCode.SKIP;
+ case SEEK_NEXT_COL:
+ return MatchCode.SEEK_NEXT_COL;
+ default:
+ // It means it is INCLUDE, INCLUDE_AND_SEEK_NEXT_COL or INCLUDE_AND_SEEK_NEXT_ROW.
+ assert matchCode == MatchCode.INCLUDE || matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL
+ || matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
+ break;
}
- ReturnCode filterResponse = ReturnCode.SKIP;
- // STEP 2: Yes, the column is part of the requested columns. Check if filter is present
- if (filter != null) {
- // STEP 3: Filter the key value and return if it filters out
- filterResponse = filter.filterKeyValue(cell);
- switch (filterResponse) {
- case SKIP:
+
+ return filter == null ? matchCode : mergeFilterResponse(cell, matchCode,
+ filter.filterKeyValue(cell));
+ }
+
+ /*
+ * Call this when scan has filter. Decide the desired behavior by checkVersions's MatchCode
+ * and filterKeyValue's ReturnCode. Cell may be skipped by filter, so the column versions
+ * in result may be less than user need. It will check versions again after filter.
+ *
+ * ColumnChecker FilterResponse Desired behavior
+ * INCLUDE SKIP SKIP
+ * INCLUDE NEXT_COL SEEK_NEXT_COL or SEEK_NEXT_ROW
+ * INCLUDE NEXT_ROW SEEK_NEXT_ROW
+ * INCLUDE SEEK_NEXT_USING_HINT SEEK_NEXT_USING_HINT
+ * INCLUDE INCLUDE INCLUDE
+ * INCLUDE INCLUDE_AND_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL
+ * INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW
+ * INCLUDE_AND_SEEK_NEXT_COL SKIP SEEK_NEXT_COL
+ * INCLUDE_AND_SEEK_NEXT_COL NEXT_COL SEEK_NEXT_COL or SEEK_NEXT_ROW
+ * INCLUDE_AND_SEEK_NEXT_COL NEXT_ROW SEEK_NEXT_ROW
+ * INCLUDE_AND_SEEK_NEXT_COL SEEK_NEXT_USING_HINT SEEK_NEXT_USING_HINT
+ * INCLUDE_AND_SEEK_NEXT_COL INCLUDE INCLUDE_AND_SEEK_NEXT_COL
+ * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL
+ * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW
+ * INCLUDE_AND_SEEK_NEXT_ROW SKIP SEEK_NEXT_ROW
+ * INCLUDE_AND_SEEK_NEXT_ROW NEXT_COL SEEK_NEXT_ROW
+ * INCLUDE_AND_SEEK_NEXT_ROW NEXT_ROW SEEK_NEXT_ROW
+ * INCLUDE_AND_SEEK_NEXT_ROW SEEK_NEXT_USING_HINT SEEK_NEXT_USING_HINT
+ * INCLUDE_AND_SEEK_NEXT_ROW INCLUDE INCLUDE_AND_SEEK_NEXT_ROW
+ * INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW
+ * INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW
+ */
+ private final MatchCode mergeFilterResponse(Cell cell, MatchCode matchCode,
+ ReturnCode filterResponse) {
+ switch (filterResponse) {
+ case SKIP:
+ if (matchCode == MatchCode.INCLUDE) {
return MatchCode.SKIP;
- case NEXT_COL:
+ } else if (matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
+ return MatchCode.SEEK_NEXT_COL;
+ } else if (matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
+ return MatchCode.SEEK_NEXT_ROW;
+ }
+ break;
+ case NEXT_COL:
+ if (matchCode == MatchCode.INCLUDE || matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL) {
return columns.getNextRowOrNextColumn(cell);
- case NEXT_ROW:
+ } else if (matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
return MatchCode.SEEK_NEXT_ROW;
- case SEEK_NEXT_USING_HINT:
- return MatchCode.SEEK_NEXT_USING_HINT;
- default:
- // It means it is either include or include and seek next
- break;
- }
+ }
+ break;
+ case NEXT_ROW:
+ return MatchCode.SEEK_NEXT_ROW;
+ case SEEK_NEXT_USING_HINT:
+ return MatchCode.SEEK_NEXT_USING_HINT;
+ case INCLUDE:
+ break;
+ case INCLUDE_AND_NEXT_COL:
+ if (matchCode == MatchCode.INCLUDE) {
+ matchCode = MatchCode.INCLUDE_AND_SEEK_NEXT_COL;
+ }
+ break;
+ case INCLUDE_AND_SEEK_NEXT_ROW:
+ break;
+ default:
+ throw new RuntimeException("UNEXPECTED");
}
- /*
- * STEP 4: Reaching this step means the column is part of the requested columns and either
- * the filter is null or the filter has returned INCLUDE or INCLUDE_AND_NEXT_COL response.
- * Now check the number of versions needed. This method call returns SKIP, INCLUDE,
- * INCLUDE_AND_SEEK_NEXT_ROW, INCLUDE_AND_SEEK_NEXT_COL.
- *
- * FilterResponse ColumnChecker Desired behavior
- * INCLUDE SKIP row has already been included, SKIP.
- * INCLUDE INCLUDE INCLUDE
- * INCLUDE INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL
- * INCLUDE INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW
- * INCLUDE_AND_SEEK_NEXT_COL SKIP row has already been included, SKIP.
- * INCLUDE_AND_SEEK_NEXT_COL INCLUDE INCLUDE_AND_SEEK_NEXT_COL
- * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_COL
- * INCLUDE_AND_SEEK_NEXT_COL INCLUDE_AND_SEEK_NEXT_ROW INCLUDE_AND_SEEK_NEXT_ROW
- *
- * In all the above scenarios, we return the column checker return value except for
- * FilterResponse (INCLUDE_AND_SEEK_NEXT_COL) and ColumnChecker(INCLUDE)
- */
- colChecker = columns.checkVersions(cell, timestamp, typeByte, false);
- if (filterResponse == ReturnCode.INCLUDE_AND_SEEK_NEXT_ROW) {
- if (colChecker != MatchCode.SKIP) {
- return MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
- }
- return MatchCode.SEEK_NEXT_ROW;
+
+ // It means it is INCLUDE, INCLUDE_AND_SEEK_NEXT_COL or INCLUDE_AND_SEEK_NEXT_ROW.
+ assert matchCode == MatchCode.INCLUDE || matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL
+ || matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW;
+
+ if (matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_COL
+ || matchCode == MatchCode.INCLUDE_AND_SEEK_NEXT_ROW) {
+ return matchCode;
+ }
+
+ // Now we will check versions again.
+ if (curColCell == null || !CellUtil.matchingRowColumn(cell, curColCell)) {
+ count = 0;
+ curColCell = cell;
}
- return (filterResponse == ReturnCode.INCLUDE_AND_NEXT_COL && colChecker == MatchCode.INCLUDE)
- ? MatchCode.INCLUDE_AND_SEEK_NEXT_COL : colChecker;
+ count += 1;
+ return count > versionsAfterFilter ? MatchCode.SEEK_NEXT_COL : MatchCode.INCLUDE;
}
protected abstract boolean isGet();
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dd24c52/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
index a93fbb2..8a3841e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide.java
@@ -88,6 +88,8 @@ import org.apache.hadoop.hbase.filter.QualifierFilter;
import org.apache.hadoop.hbase.filter.RegexStringComparator;
import org.apache.hadoop.hbase.filter.RowFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.SubstringComparator;
+import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
import org.apache.hadoop.hbase.io.hfile.BlockCache;
@@ -6424,4 +6426,197 @@ public class TestFromClientSide {
}
}
+ @Test
+ public void testDeleteSpecifiedVersionOfSpecifiedColumn() throws Exception {
+ Admin admin = TEST_UTIL.getAdmin();
+ final TableName tableName = TableName.valueOf(name.getMethodName());
+
+ byte[][] VALUES = makeN(VALUE, 5);
+ long[] ts = { 1000, 2000, 3000, 4000, 5000 };
+
+ Table ht = TEST_UTIL.createTable(tableName, FAMILY, 5);
+
+ Put put = new Put(ROW);
+ // Put version 1000,2000,3000,4000 of column FAMILY:QUALIFIER
+ for (int t = 0; t < 4; t++) {
+ put.addColumn(FAMILY, QUALIFIER, ts[t], VALUES[t]);
+ }
+ ht.put(put);
+
+ Delete delete = new Delete(ROW);
+ // Delete version 3000 of column FAMILY:QUALIFIER
+ delete.addColumn(FAMILY, QUALIFIER, ts[2]);
+ ht.delete(delete);
+
+ Get get = new Get(ROW);
+ get.addColumn(FAMILY, QUALIFIER);
+ get.setMaxVersions(Integer.MAX_VALUE);
+ Result result = ht.get(get);
+ // verify version 1000,2000,4000 remains for column FAMILY:QUALIFIER
+ assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[1], ts[3] }, new byte[][] {
+ VALUES[0], VALUES[1], VALUES[3] }, 0, 2);
+
+ delete = new Delete(ROW);
+ // Delete a version 5000 of column FAMILY:QUALIFIER which didn't exist
+ delete.addColumn(FAMILY, QUALIFIER, ts[4]);
+ ht.delete(delete);
+
+ get = new Get(ROW);
+ get.addColumn(FAMILY, QUALIFIER);
+ get.setMaxVersions(Integer.MAX_VALUE);
+ result = ht.get(get);
+ // verify version 1000,2000,4000 remains for column FAMILY:QUALIFIER
+ assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[1], ts[3] }, new byte[][] {
+ VALUES[0], VALUES[1], VALUES[3] }, 0, 2);
+
+ ht.close();
+ admin.close();
+ }
+
+ @Test
+ public void testDeleteLatestVersionOfSpecifiedColumn() throws Exception {
+ Admin admin = TEST_UTIL.getAdmin();
+ final TableName tableName = TableName.valueOf(name.getMethodName());
+
+ byte[][] VALUES = makeN(VALUE, 5);
+ long[] ts = { 1000, 2000, 3000, 4000, 5000 };
+
+ Table ht = TEST_UTIL.createTable(tableName, FAMILY, 5);
+
+ Put put = new Put(ROW);
+ // Put version 1000,2000,3000,4000 of column FAMILY:QUALIFIER
+ for (int t = 0; t < 4; t++) {
+ put.addColumn(FAMILY, QUALIFIER, ts[t], VALUES[t]);
+ }
+ ht.put(put);
+
+ Delete delete = new Delete(ROW);
+ // Delete latest version of column FAMILY:QUALIFIER
+ delete.addColumn(FAMILY, QUALIFIER);
+ ht.delete(delete);
+
+ Get get = new Get(ROW);
+ get.addColumn(FAMILY, QUALIFIER);
+ get.setMaxVersions(Integer.MAX_VALUE);
+ Result result = ht.get(get);
+ // verify version 1000,2000,3000 remains for column FAMILY:QUALIFIER
+ assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[1], ts[2] }, new byte[][] {
+ VALUES[0], VALUES[1], VALUES[2] }, 0, 2);
+
+ delete = new Delete(ROW);
+ // Delete two latest version of column FAMILY:QUALIFIER
+ delete.addColumn(FAMILY, QUALIFIER);
+ delete.addColumn(FAMILY, QUALIFIER);
+ ht.delete(delete);
+
+ get = new Get(ROW);
+ get.addColumn(FAMILY, QUALIFIER);
+ get.setMaxVersions(Integer.MAX_VALUE);
+ result = ht.get(get);
+ // verify version 1000 remains for column FAMILY:QUALIFIER
+ assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0] }, new byte[][] { VALUES[0] },
+ 0, 0);
+
+ put = new Put(ROW);
+ // Put a version 5000 of column FAMILY:QUALIFIER
+ put.addColumn(FAMILY, QUALIFIER, ts[4], VALUES[4]);
+ ht.put(put);
+
+ get = new Get(ROW);
+ get.addColumn(FAMILY, QUALIFIER);
+ get.setMaxVersions(Integer.MAX_VALUE);
+ result = ht.get(get);
+ // verify version 1000,5000 remains for column FAMILY:QUALIFIER
+ assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[0], ts[4] }, new byte[][] {
+ VALUES[0], VALUES[4] }, 0, 1);
+
+ ht.close();
+ admin.close();
+ }
+
+ /**
+ * Test for HBASE-17125
+ */
+ @Test
+ public void testReadWithFilter() throws Exception {
+ Admin admin = TEST_UTIL.getAdmin();
+ final TableName tableName = TableName.valueOf(name.getMethodName());
+ Table table = TEST_UTIL.createTable(tableName, FAMILY, 3);
+
+ byte[] VALUEA = Bytes.toBytes("value-a");
+ byte[] VALUEB = Bytes.toBytes("value-b");
+ long[] ts = { 1000, 2000, 3000, 4000 };
+
+ Put put = new Put(ROW);
+ // Put version 1000,2000,3000,4000 of column FAMILY:QUALIFIER
+ for (int t = 0; t <= 3; t++) {
+ if (t <= 1) {
+ put.addColumn(FAMILY, QUALIFIER, ts[t], VALUEA);
+ } else {
+ put.addColumn(FAMILY, QUALIFIER, ts[t], VALUEB);
+ }
+ }
+ table.put(put);
+
+ Scan scan =
+ new Scan().setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a")))
+ .setMaxVersions(3);
+ ResultScanner scanner = table.getScanner(scan);
+ Result result = scanner.next();
+ // ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3
+ assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0,
+ 0);
+
+ Get get =
+ new Get(ROW)
+ .setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a")))
+ .setMaxVersions(3);
+ result = table.get(get);
+ // ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3
+ assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0,
+ 0);
+
+ // Test with max versions 1, it should still read ts[1]
+ scan =
+ new Scan().setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a")))
+ .setMaxVersions(1);
+ scanner = table.getScanner(scan);
+ result = scanner.next();
+ // ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3
+ assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0,
+ 0);
+
+ // Test with max versions 1, it should still read ts[1]
+ get =
+ new Get(ROW)
+ .setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a")))
+ .setMaxVersions(1);
+ result = table.get(get);
+ // ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3
+ assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0,
+ 0);
+
+ // Test with max versions 5, it should still read ts[1]
+ scan =
+ new Scan().setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a")))
+ .setMaxVersions(5);
+ scanner = table.getScanner(scan);
+ result = scanner.next();
+ // ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3
+ assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0,
+ 0);
+
+ // Test with max versions 5, it should still read ts[1]
+ get =
+ new Get(ROW)
+ .setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value-a")))
+ .setMaxVersions(5);
+ result = table.get(get);
+ // ts[0] has gone from user view. Only read ts[2] which value is less or equal to 3
+ assertNResult(result, ROW, FAMILY, QUALIFIER, new long[] { ts[1] }, new byte[][] { VALUEA }, 0,
+ 0);
+
+ table.close();
+ admin.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dd24c52/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 9db7c16..b8020d8 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -123,6 +123,8 @@ import org.apache.hadoop.hbase.filter.NullComparator;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueExcludeFilter;
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
+import org.apache.hadoop.hbase.filter.SubstringComparator;
+import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandler;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
@@ -2638,6 +2640,68 @@ public class TestHRegion {
}
}
+ @Test
+ public void testGetWithFilter() throws IOException, InterruptedException {
+ byte[] row1 = Bytes.toBytes("row1");
+ byte[] fam1 = Bytes.toBytes("fam1");
+ byte[] col1 = Bytes.toBytes("col1");
+ byte[] value1 = Bytes.toBytes("value1");
+ byte[] value2 = Bytes.toBytes("value2");
+
+ final int maxVersions = 3;
+ HColumnDescriptor hcd = new HColumnDescriptor(fam1);
+ hcd.setMaxVersions(maxVersions);
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("testFilterAndColumnTracker"));
+ htd.addFamily(hcd);
+ ChunkCreator.initialize(MemStoreLABImpl.CHUNK_SIZE_DEFAULT, false, 0, 0, 0, null);
+ HRegionInfo info = new HRegionInfo(htd.getTableName(), null, null, false);
+ Path logDir = TEST_UTIL.getDataTestDirOnTestFS(method + ".log");
+ final WAL wal = HBaseTestingUtility.createWal(TEST_UTIL.getConfiguration(), logDir, info);
+ this.region = TEST_UTIL.createLocalHRegion(info, htd, wal);
+
+ try {
+ // Put 4 version to memstore
+ long ts = 0;
+ Put put = new Put(row1, ts);
+ put.addColumn(fam1, col1, value1);
+ region.put(put);
+ put = new Put(row1, ts + 1);
+ put.addColumn(fam1, col1, Bytes.toBytes("filter1"));
+ region.put(put);
+ put = new Put(row1, ts + 2);
+ put.addColumn(fam1, col1, Bytes.toBytes("filter2"));
+ region.put(put);
+ put = new Put(row1, ts + 3);
+ put.addColumn(fam1, col1, value2);
+ region.put(put);
+
+ Get get = new Get(row1);
+ get.setMaxVersions();
+ Result res = region.get(get);
+ // Get 3 versions, the oldest version has gone from user view
+ assertEquals(maxVersions, res.size());
+
+ get.setFilter(new ValueFilter(CompareOp.EQUAL, new SubstringComparator("value")));
+ res = region.get(get);
+ // When use value filter, the oldest version should still gone from user view and it
+ // should only return one key vaule
+ assertEquals(1, res.size());
+ assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
+ assertEquals(ts + 3, res.rawCells()[0].getTimestamp());
+
+ region.flush(true);
+ region.compact(true);
+ Thread.sleep(1000);
+ res = region.get(get);
+ // After flush and compact, the result should be consistent with previous result
+ assertEquals(1, res.size());
+ assertTrue(CellUtil.matchingValue(new KeyValue(row1, fam1, col1, value2), res.rawCells()[0]));
+ } finally {
+ HBaseTestingUtility.closeRegionAndWAL(this.region);
+ this.region = null;
+ }
+ }
+
// ////////////////////////////////////////////////////////////////////////////
// Scanner tests
// ////////////////////////////////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dd24c52/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java
index 52b5a40..e8d60e6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestMinVersions.java
@@ -431,24 +431,27 @@ public class TestMinVersions {
tss.add(ts-1);
tss.add(ts-2);
+ // Sholud only get T2, versions is 2, so T1 is gone from user view.
Get g = new Get(T1);
g.addColumn(c1,c1);
g.setFilter(new TimestampsFilter(tss));
g.setMaxVersions();
Result r = region.get(g);
- checkResult(r, c1, T2,T1);
+ checkResult(r, c1, T2);
+ // Sholud only get T2, versions is 2, so T1 is gone from user view.
g = new Get(T1);
g.addColumn(c0,c0);
g.setFilter(new TimestampsFilter(tss));
g.setMaxVersions();
r = region.get(g);
- checkResult(r, c0, T2,T1);
+ checkResult(r, c0, T2);
// now flush/compact
region.flush(true);
region.compact(true);
+ // After flush/compact, the result should be consistent with previous result
g = new Get(T1);
g.addColumn(c1,c1);
g.setFilter(new TimestampsFilter(tss));
@@ -456,6 +459,7 @@ public class TestMinVersions {
r = region.get(g);
checkResult(r, c1, T2);
+ // After flush/compact, the result should be consistent with previous result
g = new Get(T1);
g.addColumn(c0,c0);
g.setFilter(new TimestampsFilter(tss));
http://git-wip-us.apache.org/repos/asf/hbase/blob/4dd24c52/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
index 1e3c84c..537601d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestStore.java
@@ -1067,10 +1067,11 @@ public class TestStore {
@Test
public void testFlushBeforeCompletingScanWoFilter() throws IOException, InterruptedException {
final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
+ final int expectedSize = 3;
testFlushBeforeCompletingScan(new MyListHook() {
@Override
public void hook(int currentSize) {
- if (currentSize == 2) {
+ if (currentSize == expectedSize - 1) {
try {
flushStore(store, id++);
timeToGoNextRow.set(true);
@@ -1084,16 +1085,17 @@ public class TestStore {
public Filter.ReturnCode filterKeyValue(Cell v) throws IOException {
return ReturnCode.INCLUDE;
}
- });
+ }, expectedSize);
}
@Test
public void testFlushBeforeCompletingScanWithFilter() throws IOException, InterruptedException {
final AtomicBoolean timeToGoNextRow = new AtomicBoolean(false);
+ final int expectedSize = 2;
testFlushBeforeCompletingScan(new MyListHook() {
@Override
public void hook(int currentSize) {
- if (currentSize == 2) {
+ if (currentSize == expectedSize - 1) {
try {
flushStore(store, id++);
timeToGoNextRow.set(true);
@@ -1112,16 +1114,17 @@ public class TestStore {
return ReturnCode.INCLUDE;
}
}
- });
+ }, expectedSize);
}
@Test
public void testFlushBeforeCompletingScanWithFilterHint() throws IOException, InterruptedException {
final AtomicBoolean timeToGetHint = new AtomicBoolean(false);
+ final int expectedSize = 2;
testFlushBeforeCompletingScan(new MyListHook() {
@Override
public void hook(int currentSize) {
- if (currentSize == 2) {
+ if (currentSize == expectedSize - 1) {
try {
flushStore(store, id++);
timeToGetHint.set(true);
@@ -1144,10 +1147,10 @@ public class TestStore {
public Cell getNextCellHint(Cell currentCell) throws IOException {
return currentCell;
}
- });
+ }, expectedSize);
}
- private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter)
+ private void testFlushBeforeCompletingScan(MyListHook hook, Filter filter, int expectedSize)
throws IOException, InterruptedException {
Configuration conf = HBaseConfiguration.create();
HColumnDescriptor hcd = new HColumnDescriptor(family);
@@ -1188,7 +1191,7 @@ public class TestStore {
scan, null, seqId + 3)){
// r1
scanner.next(myList);
- assertEquals(3, myList.size());
+ assertEquals(expectedSize, myList.size());
for (Cell c : myList) {
byte[] actualValue = CellUtil.cloneValue(c);
assertTrue("expected:" + Bytes.toStringBinary(value1)