You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2018/09/20 14:21:59 UTC
hbase git commit: HBASE-21206 Scan with batch size may return
incomplete cells
Repository: hbase
Updated Branches:
refs/heads/branch-2.1 c5af7b654 -> 5a73a1ab2
HBASE-21206 Scan with batch size may return incomplete cells
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5a73a1ab
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5a73a1ab
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5a73a1ab
Branch: refs/heads/branch-2.1
Commit: 5a73a1ab250ce0d1e3ab73d3366887ad7826adf0
Parents: c5af7b6
Author: openinx <op...@gmail.com>
Authored: Tue Sep 18 16:45:45 2018 +0800
Committer: openinx <op...@gmail.com>
Committed: Thu Sep 20 22:20:02 2018 +0800
----------------------------------------------------------------------
.../hbase/regionserver/RSRpcServices.java | 20 ++++--
.../hbase/client/TestFromClientSide3.java | 75 ++++++++++++++++++++
2 files changed, 91 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/5a73a1ab/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 375b219..f6617a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -3168,6 +3168,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
break;
}
}
+ } else if (!moreRows && !results.isEmpty()) {
+ // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
+ // last result is false. Otherwise it's possible that: the first nextRaw returned
+ // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
+ // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
+ // return with moreRows=false, which means moreResultsInRegion would be false, it will
+ // be a contradictory state (HBASE-21206).
+ int lastIdx = results.size() - 1;
+ Result r = results.get(lastIdx);
+ if (r.mayHaveMoreCellsInRow()) {
+ results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false));
+ }
}
boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
@@ -3183,12 +3195,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
// there are more values to be read server side. If there aren't more values,
// marking it as a heartbeat is wasteful because the client will need to issue
// another ScanRequest only to realize that they already have all the values
- if (moreRows) {
+ if (moreRows && timeLimitReached) {
// Heartbeat messages occur when the time limit has been reached.
- builder.setHeartbeatMessage(timeLimitReached);
- if (timeLimitReached && rsh.needCursor) {
+ builder.setHeartbeatMessage(true);
+ if (rsh.needCursor) {
Cell cursorCell = scannerContext.getLastPeekedCell();
- if (cursorCell != null ) {
+ if (cursorCell != null) {
builder.setCursor(ProtobufUtil.toCursor(cursorCell));
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/5a73a1ab/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index 24591c0..fad4f45 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -33,6 +33,7 @@ import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
@@ -1052,4 +1054,77 @@ public class TestFromClientSide3 {
return s;
}
}
+
+ private static byte[] generateHugeValue(int size) {
+ Random rand = ThreadLocalRandom.current();
+ byte[] value = new byte[size];
+ for (int i = 0; i < value.length; i++) {
+ value[i] = (byte) rand.nextInt(256);
+ }
+ return value;
+ }
+
+ @Test
+ public void testScanWithBatchSizeReturnIncompleteCells() throws IOException {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
+ .build();
+
+ Table table = TEST_UTIL.createTable(hd, null);
+
+ Put put = new Put(ROW);
+ put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
+ table.put(put);
+
+ put = new Put(ROW);
+ put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024));
+ table.put(put);
+
+ for (int i = 2; i < 5; i++) {
+ for (int version = 0; version < 2; version++) {
+ put = new Put(ROW);
+ put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024));
+ table.put(put);
+ }
+ }
+
+ Scan scan = new Scan();
+ scan.withStartRow(ROW).withStopRow(ROW).addFamily(FAMILY).setBatch(3)
+ .setMaxResultSize(4 * 1024 * 1024);
+ Result result;
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ List<Result> list = new ArrayList<>();
+ /*
+ * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The second
+ * scan rpc should return a result with 3 cells, because reach the batch limit = 3; The
+ * mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
+ * moreResultsInRegion also would be false. Finally, the client should collect all the cells
+ * into two result: 2+3 -> 3+2;
+ */
+ while ((result = scanner.next()) != null) {
+ list.add(result);
+ }
+
+ Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
+ Assert.assertEquals(2, list.size());
+ Assert.assertEquals(3, list.get(0).size());
+ Assert.assertEquals(2, list.get(1).size());
+ }
+
+ scan = new Scan();
+ scan.withStartRow(ROW).withStopRow(ROW).addFamily(FAMILY).setBatch(2)
+ .setMaxResultSize(4 * 1024 * 1024);
+ try (ResultScanner scanner = table.getScanner(scan)) {
+ List<Result> list = new ArrayList<>();
+ while ((result = scanner.next()) != null) {
+ list.add(result);
+ }
+ Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
+ Assert.assertEquals(3, list.size());
+ Assert.assertEquals(2, list.get(0).size());
+ Assert.assertEquals(2, list.get(1).size());
+ Assert.assertEquals(1, list.get(2).size());
+ }
+ }
}