You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by op...@apache.org on 2018/09/20 14:21:59 UTC

hbase git commit: HBASE-21206 Scan with batch size may return incomplete cells

Repository: hbase
Updated Branches:
  refs/heads/branch-2.1 c5af7b654 -> 5a73a1ab2


HBASE-21206 Scan with batch size may return incomplete cells


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5a73a1ab
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5a73a1ab
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5a73a1ab

Branch: refs/heads/branch-2.1
Commit: 5a73a1ab250ce0d1e3ab73d3366887ad7826adf0
Parents: c5af7b6
Author: openinx <op...@gmail.com>
Authored: Tue Sep 18 16:45:45 2018 +0800
Committer: openinx <op...@gmail.com>
Committed: Thu Sep 20 22:20:02 2018 +0800

----------------------------------------------------------------------
 .../hbase/regionserver/RSRpcServices.java       | 20 ++++--
 .../hbase/client/TestFromClientSide3.java       | 75 ++++++++++++++++++++
 2 files changed, 91 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5a73a1ab/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 375b219..f6617a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -3168,6 +3168,18 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
                 break;
               }
             }
+          } else if (!moreRows && !results.isEmpty()) {
+            // No more cells for the scan here, we need to ensure that the mayHaveMoreCellsInRow of
+            // last result is false. Otherwise it's possible that: the first nextRaw returned
+            // because BATCH_LIMIT_REACHED (BTW it happen to exhaust all cells of the scan),so the
+            // last result's mayHaveMoreCellsInRow will be true. while the following nextRaw will
+            // return with moreRows=false, which means moreResultsInRegion would be false, it will
+            // be a contradictory state (HBASE-21206).
+            int lastIdx = results.size() - 1;
+            Result r = results.get(lastIdx);
+            if (r.mayHaveMoreCellsInRow()) {
+              results.set(lastIdx, Result.create(r.rawCells(), r.getExists(), r.isStale(), false));
+            }
           }
           boolean sizeLimitReached = scannerContext.checkSizeLimit(LimitScope.BETWEEN_ROWS);
           boolean timeLimitReached = scannerContext.checkTimeLimit(LimitScope.BETWEEN_ROWS);
@@ -3183,12 +3195,12 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
             // there are more values to be read server side. If there aren't more values,
             // marking it as a heartbeat is wasteful because the client will need to issue
             // another ScanRequest only to realize that they already have all the values
-            if (moreRows) {
+            if (moreRows && timeLimitReached) {
               // Heartbeat messages occur when the time limit has been reached.
-              builder.setHeartbeatMessage(timeLimitReached);
-              if (timeLimitReached && rsh.needCursor) {
+              builder.setHeartbeatMessage(true);
+              if (rsh.needCursor) {
                 Cell cursorCell = scannerContext.getLastPeekedCell();
-                if (cursorCell != null ) {
+                if (cursorCell != null) {
                   builder.setCursor(ProtobufUtil.toCursor(cursorCell));
                 }
               }

http://git-wip-us.apache.org/repos/asf/hbase/blob/5a73a1ab/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
index 24591c0..fad4f45 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestFromClientSide3.java
@@ -33,6 +33,7 @@ import java.util.Random;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.hadoop.conf.Configuration;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
@@ -1052,4 +1054,77 @@ public class TestFromClientSide3 {
       return s;
     }
   }
+
+  private static byte[] generateHugeValue(int size) {
+    Random rand = ThreadLocalRandom.current();
+    byte[] value = new byte[size];
+    for (int i = 0; i < value.length; i++) {
+      value[i] = (byte) rand.nextInt(256);
+    }
+    return value;
+  }
+
+  @Test
+  public void testScanWithBatchSizeReturnIncompleteCells() throws IOException {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    TableDescriptor hd = TableDescriptorBuilder.newBuilder(tableName)
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setMaxVersions(3).build())
+        .build();
+
+    Table table = TEST_UTIL.createTable(hd, null);
+
+    Put put = new Put(ROW);
+    put.addColumn(FAMILY, Bytes.toBytes(0), generateHugeValue(3 * 1024 * 1024));
+    table.put(put);
+
+    put = new Put(ROW);
+    put.addColumn(FAMILY, Bytes.toBytes(1), generateHugeValue(4 * 1024 * 1024));
+    table.put(put);
+
+    for (int i = 2; i < 5; i++) {
+      for (int version = 0; version < 2; version++) {
+        put = new Put(ROW);
+        put.addColumn(FAMILY, Bytes.toBytes(i), generateHugeValue(1024));
+        table.put(put);
+      }
+    }
+
+    Scan scan = new Scan();
+    scan.withStartRow(ROW).withStopRow(ROW).addFamily(FAMILY).setBatch(3)
+        .setMaxResultSize(4 * 1024 * 1024);
+    Result result;
+    try (ResultScanner scanner = table.getScanner(scan)) {
+      List<Result> list = new ArrayList<>();
+      /*
+       * The first scan rpc should return a result with 2 cells, because 3MB + 4MB > 4MB; The second
+       * scan rpc should return a result with 3 cells, because reach the batch limit = 3; The
+       * mayHaveMoreCellsInRow in last result should be false in the scan rpc. BTW, the
+       * moreResultsInRegion also would be false. Finally, the client should collect all the cells
+       * into two result: 2+3 -> 3+2;
+       */
+      while ((result = scanner.next()) != null) {
+        list.add(result);
+      }
+
+      Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
+      Assert.assertEquals(2, list.size());
+      Assert.assertEquals(3, list.get(0).size());
+      Assert.assertEquals(2, list.get(1).size());
+    }
+
+    scan = new Scan();
+    scan.withStartRow(ROW).withStopRow(ROW).addFamily(FAMILY).setBatch(2)
+        .setMaxResultSize(4 * 1024 * 1024);
+    try (ResultScanner scanner = table.getScanner(scan)) {
+      List<Result> list = new ArrayList<>();
+      while ((result = scanner.next()) != null) {
+        list.add(result);
+      }
+      Assert.assertEquals(5, list.stream().mapToInt(Result::size).sum());
+      Assert.assertEquals(3, list.size());
+      Assert.assertEquals(2, list.get(0).size());
+      Assert.assertEquals(2, list.get(1).size());
+      Assert.assertEquals(1, list.get(2).size());
+    }
+  }
 }