You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/29 12:35:30 UTC
[03/50] [abbrv] hbase git commit: HBASE-19818 Scan time limit not
work if the filter always filter row key
HBASE-19818 Scan time limit not work if the filter always filter row key
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c88e570d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c88e570d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c88e570d
Branch: refs/heads/HBASE-19397-branch-2
Commit: c88e570dc153a346e9e0ae9047bf6217a552928f
Parents: c16dae1
Author: Guanghao Zhang <zg...@apache.org>
Authored: Thu Jan 25 18:16:23 2018 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Fri Jan 26 17:06:00 2018 +0800
----------------------------------------------------------------------
.../hadoop/hbase/regionserver/HRegion.java | 36 ++++--
.../regionserver/NoLimitScannerContext.java | 12 +-
.../hbase/regionserver/ScannerContext.java | 118 +++++++++++++++++--
.../hadoop/hbase/regionserver/StoreScanner.java | 1 -
.../hbase/client/TestRawAsyncScanCursor.java | 10 +-
.../TestScannerHeartbeatMessages.java | 66 ++++++++---
6 files changed, 187 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/c88e570d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index c0ccc1d..ae0f3d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6390,7 +6390,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
int initialBatchProgress = scannerContext.getBatchProgress();
long initialSizeProgress = scannerContext.getDataSizeProgress();
long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
- long initialTimeProgress = scannerContext.getTimeProgress();
+
+ // Used to check time limit
+ LimitScope limitScope = LimitScope.BETWEEN_CELLS;
// The loop here is used only when at some point during the next we determine
// that due to effects of filters or otherwise, we have an empty row in the result.
@@ -6403,7 +6405,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (scannerContext.getKeepProgress()) {
// Progress should be kept. Reset to initial values seen at start of method invocation.
scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
- initialHeapSizeProgress, initialTimeProgress);
+ initialHeapSizeProgress);
} else {
scannerContext.clearProgress();
}
@@ -6442,6 +6444,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
+ limitScope = LimitScope.BETWEEN_ROWS;
+ }
+
+ if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
+ if (hasFilterRow) {
+ throw new IncompatibleFilterException(
+ "Filter whose hasFilterRow() returns true is incompatible with scans that must " +
+ " stop mid-row because of a limit. ScannerContext:" + scannerContext);
+ }
+ return true;
}
// Check if we were getting data from the joinedHeap and hit the limit.
@@ -6472,6 +6484,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
results.clear();
+
+ // Read nothing as the rowkey was filtered, but still need to check time limit
+ if (scannerContext.checkTimeLimit(limitScope)) {
+ return true;
+ }
continue;
}
@@ -6498,16 +6515,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
ret = filter.filterRowCellsWithRet(results);
// We don't know how the results have changed after being filtered. Must set progress
- // according to contents of results now. However, a change in the results should not
- // affect the time progress. Thus preserve whatever time progress has been made
- long timeProgress = scannerContext.getTimeProgress();
+ // according to contents of results now.
if (scannerContext.getKeepProgress()) {
scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
- initialHeapSizeProgress, initialTimeProgress);
+ initialHeapSizeProgress);
} else {
scannerContext.clearProgress();
}
- scannerContext.setTimeProgress(timeProgress);
scannerContext.incrementBatchProgress(results.size());
for (Cell cell : results) {
scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
@@ -6525,7 +6539,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// This row was totally filtered out, if this is NOT the last row,
// we should continue on. Otherwise, nothing else to do.
- if (!shouldStop) continue;
+ if (!shouldStop) {
+ // Read nothing as the cells was filtered, but still need to check time limit
+ if (scannerContext.checkTimeLimit(limitScope)) {
+ return true;
+ }
+ continue;
+ }
return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c88e570d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
index ef1fb68..26d8192 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
@@ -68,17 +68,7 @@ public class NoLimitScannerContext extends ScannerContext {
}
@Override
- void setTimeProgress(long timeProgress) {
- // Do nothing. NoLimitScannerContext instances are immutable post-construction
- }
-
- @Override
- void updateTimeProgress() {
- // Do nothing. NoLimitScannerContext instances are immutable post-construction
- }
-
- @Override
- void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) {
+ void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress) {
// Do nothing. NoLimitScannerContext instances are immutable post-construction
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c88e570d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index 6b2267f..94d0811 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -51,11 +51,13 @@ import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
@InterfaceStability.Evolving
public class ScannerContext {
+ LimitFields limits;
/**
- * Two sets of the same fields. One for the limits, another for the progress towards those limits
+ * A different set of progress fields. Only include batch, dataSize and heapSize. Compare to
+ * LimitFields, ProgressFields doesn't contain time field. As we save a deadline in LimitFields,
+ * so use {@link System#currentTimeMillis()} directly when check time limit.
*/
- LimitFields limits;
- LimitFields progress;
+ ProgressFields progress;
/**
* The state of the scanner after the invocation of {@link InternalScanner#next(java.util.List)}
@@ -104,10 +106,12 @@ public class ScannerContext {
ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics) {
this.limits = new LimitFields();
- if (limitsToCopy != null) this.limits.copy(limitsToCopy);
+ if (limitsToCopy != null) {
+ this.limits.copy(limitsToCopy);
+ }
// Progress fields are initialized to 0
- progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0);
+ progress = new ProgressFields(0, 0, 0);
this.keepProgress = keepProgress;
this.scannerState = DEFAULT_STATE;
@@ -162,9 +166,10 @@ public class ScannerContext {
/**
* Update the time progress with {@link System#currentTimeMillis()}
+ * @deprecated will be removed in 3.0
*/
+ @Deprecated
void updateTimeProgress() {
- progress.setTime(System.currentTimeMillis());
}
int getBatchProgress() {
@@ -179,14 +184,25 @@ public class ScannerContext {
return progress.getHeapSize();
}
+ /**
+ * @deprecated will be removed in 3.0
+ */
+ @Deprecated
long getTimeProgress() {
- return progress.getTime();
+ return System.currentTimeMillis();
}
+ /**
+ * @deprecated will be removed in 3.0
+ */
+ @Deprecated
void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) {
+ setProgress(batchProgress, sizeProgress, heapSizeProgress);
+ }
+
+ void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress) {
setBatchProgress(batchProgress);
setSizeProgress(sizeProgress, heapSizeProgress);
- setTimeProgress(timeProgress);
}
void setSizeProgress(long dataSizeProgress, long heapSizeProgress) {
@@ -198,8 +214,11 @@ public class ScannerContext {
progress.setBatch(batchProgress);
}
+ /**
+ * @deprecated will be removed in 3.0
+ */
+ @Deprecated
void setTimeProgress(long timeProgress) {
- progress.setTime(timeProgress);
}
/**
@@ -207,7 +226,7 @@ public class ScannerContext {
* values
*/
void clearProgress() {
- progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0);
+ progress.setFields(0, 0, 0);
}
/**
@@ -319,7 +338,7 @@ public class ScannerContext {
* @return true when the limit is enforceable from the checker's scope and it has been reached
*/
boolean checkTimeLimit(LimitScope checkerScope) {
- return hasTimeLimit(checkerScope) && progress.getTime() >= limits.getTime();
+ return hasTimeLimit(checkerScope) && (System.currentTimeMillis() >= limits.getTime());
}
/**
@@ -690,4 +709,81 @@ public class ScannerContext {
return sb.toString();
}
}
+
+ private static class ProgressFields {
+
+ private static int DEFAULT_BATCH = -1;
+ private static long DEFAULT_SIZE = -1L;
+
+ // The batch limit will always be enforced between cells, thus, there isn't a field to hold the
+ // batch scope
+ int batch = DEFAULT_BATCH;
+
+ // The sum of cell data sizes(key + value). The Cell data might be in on heap or off heap area.
+ long dataSize = DEFAULT_SIZE;
+ // The sum of heap space occupied by all tracked cells. This includes Cell POJO's overhead as
+ // such AND data cells of Cells which are in on heap area.
+ long heapSize = DEFAULT_SIZE;
+
+ /**
+ * Fields keep their default values.
+ */
+ ProgressFields() {
+ }
+
+ ProgressFields(int batch, long size, long heapSize) {
+ setFields(batch, size, heapSize);
+ }
+
+ /**
+ * Set all fields together.
+ */
+ void setFields(int batch, long dataSize, long heapSize) {
+ setBatch(batch);
+ setDataSize(dataSize);
+ setHeapSize(heapSize);
+ }
+
+ int getBatch() {
+ return this.batch;
+ }
+
+ void setBatch(int batch) {
+ this.batch = batch;
+ }
+
+ long getDataSize() {
+ return this.dataSize;
+ }
+
+ long getHeapSize() {
+ return this.heapSize;
+ }
+
+ void setDataSize(long dataSize) {
+ this.dataSize = dataSize;
+ }
+
+ void setHeapSize(long heapSize) {
+ this.heapSize = heapSize;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("{");
+
+ sb.append("batch:");
+ sb.append(batch);
+
+ sb.append(", dataSize:");
+ sb.append(dataSize);
+
+ sb.append(", heapSize:");
+ sb.append(heapSize);
+
+ sb.append("}");
+ return sb.toString();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c88e570d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 7e682f9..0b9b547 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -552,7 +552,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
LOOP: do {
// Update and check the time limit based on the configured value of cellsPerTimeoutCheck
if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
- scannerContext.updateTimeProgress();
if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/c88e570d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
index 0624a30..04ffd7b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
@@ -79,12 +80,9 @@ public class TestRawAsyncScanCursor extends AbstractTestScanCursor {
assertEquals(1, results.length);
assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS);
// we will always provide a scan cursor if time limit is reached.
- if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) {
- assertFalse(controller.cursor().isPresent());
- } else {
- assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1],
- controller.cursor().get().getRow());
- }
+ assertTrue(controller.cursor().isPresent());
+ assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1],
+ controller.cursor().get().getRow());
assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow());
count++;
} catch (Throwable e) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/c88e570d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
index fb02d9d..11cc365 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
@@ -144,12 +144,6 @@ public class TestScannerHeartbeatMessages {
/**
* Make puts to put the input value into each combination of row, family, and qualifier
- * @param rows
- * @param families
- * @param qualifiers
- * @param value
- * @return
- * @throws IOException
*/
static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
byte[] value) throws IOException {
@@ -189,8 +183,6 @@ public class TestScannerHeartbeatMessages {
* Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
* when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
* disabled, the test should throw an exception.
- * @param testCallable
- * @throws InterruptedException
*/
private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
HeartbeatRPCServices.heartbeatsEnabled = true;
@@ -217,7 +209,6 @@ public class TestScannerHeartbeatMessages {
/**
* Test the case that the time limit for the scan is reached after each full row of cells is
* fetched.
- * @throws Exception
*/
@Test
public void testHeartbeatBetweenRows() throws Exception {
@@ -239,7 +230,6 @@ public class TestScannerHeartbeatMessages {
/**
* Test the case that the time limit for scans is reached in between column families
- * @throws Exception
*/
@Test
public void testHeartbeatBetweenColumnFamilies() throws Exception {
@@ -263,7 +253,7 @@ public class TestScannerHeartbeatMessages {
});
}
- public static class SparseFilter extends FilterBase {
+ public static class SparseCellFilter extends FilterBase {
@Override
public ReturnCode filterCell(final Cell v) throws IOException {
@@ -277,23 +267,39 @@ public class TestScannerHeartbeatMessages {
}
public static Filter parseFrom(final byte[] pbBytes) {
- return new SparseFilter();
+ return new SparseCellFilter();
+ }
+ }
+
+ public static class SparseRowFilter extends FilterBase {
+
+ @Override
+ public boolean filterRowKey(Cell cell) throws IOException {
+ try {
+ Thread.sleep(CLIENT_TIMEOUT / 2 - 100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]);
+ }
+
+ public static Filter parseFrom(final byte[] pbBytes) {
+ return new SparseRowFilter();
}
}
/**
* Test the case that there is a filter which filters most of cells
- * @throws Exception
*/
@Test
- public void testHeartbeatWithSparseFilter() throws Exception {
+ public void testHeartbeatWithSparseCellFilter() throws Exception {
testImportanceOfHeartbeats(new Callable<Void>() {
@Override
public Void call() throws Exception {
Scan scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
- scan.setFilter(new SparseFilter());
+ scan.setFilter(new SparseCellFilter());
ResultScanner scanner = TABLE.getScanner(scan);
int num = 0;
while (scanner.next() != null) {
@@ -305,7 +311,7 @@ public class TestScannerHeartbeatMessages {
scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
- scan.setFilter(new SparseFilter());
+ scan.setFilter(new SparseCellFilter());
scan.setAllowPartialResults(true);
scanner = TABLE.getScanner(scan);
num = 0;
@@ -321,6 +327,31 @@ public class TestScannerHeartbeatMessages {
}
/**
+ * Test the case that there is a filter which filters most of rows
+ */
+ @Test
+ public void testHeartbeatWithSparseRowFilter() throws Exception {
+ testImportanceOfHeartbeats(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ Scan scan = new Scan();
+ scan.setMaxResultSize(Long.MAX_VALUE);
+ scan.setCaching(Integer.MAX_VALUE);
+ scan.setFilter(new SparseRowFilter());
+ ResultScanner scanner = TABLE.getScanner(scan);
+ int num = 0;
+ while (scanner.next() != null) {
+ num++;
+ }
+ assertEquals(1, num);
+ scanner.close();
+
+ return null;
+ }
+ });
+ }
+
+ /**
* Test the equivalence of a scan versus the same scan executed when heartbeat messages are
* necessary
* @param scan The scan configuration being tested
@@ -328,7 +359,6 @@ public class TestScannerHeartbeatMessages {
* @param cfSleepTime The time to sleep between fetches of column family cells
* @param sleepBeforeCf set to true when column family sleeps should occur before the cells for
* that column family are fetched
- * @throws Exception
*/
private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
int cfSleepTime, boolean sleepBeforeCf) throws Exception {
@@ -361,8 +391,6 @@ public class TestScannerHeartbeatMessages {
/**
* Helper method for setting the time to sleep between rows and column families. If a sleep time
* is negative then that sleep will be disabled
- * @param rowSleepTime
- * @param cfSleepTime
*/
private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) {
HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;