You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/15 13:08:38 UTC

hbase git commit: HBASE-20457 Return immediately for a scan rpc call when we want to switch from pread to stream

Repository: hbase
Updated Branches:
  refs/heads/master d2daada97 -> 26babcf01


HBASE-20457 Return immediately for a scan rpc call when we want to switch from pread to stream


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/26babcf0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/26babcf0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/26babcf0

Branch: refs/heads/master
Commit: 26babcf013de696b899d76a3c39434b794440d8d
Parents: d2daada
Author: zhangduo <zh...@apache.org>
Authored: Thu Apr 26 17:54:13 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Tue May 15 20:56:20 2018 +0800

----------------------------------------------------------------------
 .../RpcRetryingCallerWithReadReplicas.java      |   1 +
 .../hbase/regionserver/ScannerContext.java      |  27 ++--
 .../hadoop/hbase/regionserver/StoreScanner.java |  17 ++-
 .../regionserver/TestSwitchToStreamRead.java    | 141 +++++++++++++++++--
 4 files changed, 164 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/26babcf0/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 4a31cff..a0be0bf 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -279,6 +279,7 @@ public class RpcRetryingCallerWithReadReplicas {
       throws RetriesExhaustedException, DoNotRetryIOException {
     Throwable t = e.getCause();
     assert t != null; // That's what ExecutionException is about: holding an exception
+    t.printStackTrace();
 
     if (t instanceof RetriesExhaustedException) {
       throw (RetriesExhaustedException) t;

http://git-wip-us.apache.org/repos/asf/hbase/blob/26babcf0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index 10f9b24..cc6ec84 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -99,6 +99,12 @@ public class ScannerContext {
 
   private Cell lastPeekedCell = null;
 
+  // Set this to true will have the same behavior with reaching the time limit.
+  // This is used when you want to make the current RSRpcService.scan returns immediately. For
+  // example, when we want to switch from pread to stream, we can only do it after the rpc call is
+  // returned.
+  private boolean returnImmediately;
+
   /**
    * Tracks the relevant server side metrics during scans. null when metrics should not be tracked
    */
@@ -247,7 +253,8 @@ public class ScannerContext {
    * @return true if the time limit can be enforced in the checker's scope
    */
   boolean hasTimeLimit(LimitScope checkerScope) {
-    return limits.canEnforceTimeLimitFromScope(checkerScope) && limits.getTime() > 0;
+    return limits.canEnforceTimeLimitFromScope(checkerScope) &&
+      (limits.getTime() > 0 || returnImmediately);
   }
 
   /**
@@ -307,7 +314,8 @@ public class ScannerContext {
    * @return true when the limit is enforceable from the checker's scope and it has been reached
    */
   boolean checkTimeLimit(LimitScope checkerScope) {
-    return hasTimeLimit(checkerScope) && (System.currentTimeMillis() >= limits.getTime());
+    return hasTimeLimit(checkerScope) &&
+      (returnImmediately || System.currentTimeMillis() >= limits.getTime());
   }
 
   /**
@@ -327,6 +335,10 @@ public class ScannerContext {
     this.lastPeekedCell = lastPeekedCell;
   }
 
+  void returnImmediately() {
+    this.returnImmediately = true;
+  }
+
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
@@ -539,11 +551,6 @@ public class ScannerContext {
     LimitFields() {
     }
 
-    LimitFields(int batch, LimitScope sizeScope, long size, long heapSize, LimitScope timeScope,
-        long time) {
-      setFields(batch, sizeScope, size, heapSize, timeScope, time);
-    }
-
     void copy(LimitFields limitsToCopy) {
       if (limitsToCopy != null) {
         setFields(limitsToCopy.getBatch(), limitsToCopy.getSizeScope(), limitsToCopy.getDataSize(),
@@ -691,12 +698,6 @@ public class ScannerContext {
     // such AND data cells of Cells which are in on heap area.
     long heapSize = DEFAULT_SIZE;
 
-    /**
-     * Fields keep their default values.
-     */
-    ProgressFields() {
-    }
-
     ProgressFields(int batch, long size, long heapSize) {
       setFields(batch, size, heapSize);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/26babcf0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index fa9a3de..f4cc24d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -553,7 +553,10 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
 
     LOOP: do {
       // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
-      if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
+      // Or if the preadMaxBytes is reached and we may want to return so we can switch to stream in
+      // the shipped method below.
+      if (kvsScanned % cellsPerHeartbeatCheck == 0 || (scanUsePread &&
+        scan.getReadType() == Scan.ReadType.DEFAULT && bytesRead > preadMaxBytes)) {
         if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
           return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
         }
@@ -565,6 +568,18 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
       checkScanOrder(prevCell, cell, comparator);
       int cellSize = PrivateCellUtil.estimatedSerializedSizeOf(cell);
       bytesRead += cellSize;
+      if (scanUsePread && scan.getReadType() == Scan.ReadType.DEFAULT &&
+        bytesRead > preadMaxBytes) {
+        // return immediately if we want to switch from pread to stream. We need this because we can
+        // only switch in the shipped method, if user use a filter to filter out everything and rpc
+        // timeout is very large then the shipped method will never be called until the whole scan
+        // is finished, but at that time we have already scan all the data...
+        // See HBASE-20457 for more details.
+        // And there is still a scenario that can not be handled. If we have a very large row, which
+        // have millions of qualifiers, and filter.filterRow is used, then even if we set the flag
+        // here, we still need to scan all the qualifiers before returning...
+        scannerContext.returnImmediately();
+      }
       prevCell = cell;
       scannerContext.setLastPeekedCell(cell);
       topChanged = false;

http://git-wip-us.apache.org/repos/asf/hbase/blob/26babcf0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
index 0af2970..815643d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSwitchToStreamRead.java
@@ -34,13 +34,17 @@ import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
+import org.apache.hadoop.hbase.regionserver.ScannerContext.LimitScope;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -49,7 +53,7 @@ public class TestSwitchToStreamRead {
 
   @ClassRule
   public static final HBaseClassTestRule CLASS_RULE =
-      HBaseClassTestRule.forClass(TestSwitchToStreamRead.class);
+    HBaseClassTestRule.forClass(TestSwitchToStreamRead.class);
 
   private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
 
@@ -73,18 +77,18 @@ public class TestSwitchToStreamRead {
     VALUE_PREFIX = sb.append("-").toString();
     REGION = UTIL.createLocalHRegion(
       TableDescriptorBuilder.newBuilder(TABLE_NAME)
-          .setColumnFamily(
-            ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build())
-          .build(),
+        .setColumnFamily(
+          ColumnFamilyDescriptorBuilder.newBuilder(FAMILY).setBlocksize(1024).build())
+        .build(),
       null, null);
     for (int i = 0; i < 900; i++) {
       REGION
-          .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
+        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
     }
     REGION.flush(true);
     for (int i = 900; i < 1000; i++) {
       REGION
-          .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
+        .put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(VALUE_PREFIX + i)));
     }
   }
 
@@ -97,8 +101,8 @@ public class TestSwitchToStreamRead {
   @Test
   public void test() throws IOException {
     try (RegionScannerImpl scanner = REGION.getScanner(new Scan())) {
-      StoreScanner storeScanner = (StoreScanner) (scanner)
-          .getStoreHeapForTesting().getCurrentForTesting();
+      StoreScanner storeScanner =
+        (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
       for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
         if (kvs instanceof StoreFileScanner) {
           StoreFileScanner sfScanner = (StoreFileScanner) kvs;
@@ -134,4 +138,125 @@ public class TestSwitchToStreamRead {
       assertFalse(sf.isReferencedInReads());
     }
   }
+
+  public static final class MatchLastRowKeyFilter extends FilterBase {
+
+    @Override
+    public boolean filterRowKey(Cell cell) throws IOException {
+      return Bytes.toInt(cell.getRowArray(), cell.getRowOffset()) != 999;
+    }
+  }
+
+  private void testFilter(Filter filter) throws IOException {
+    try (RegionScannerImpl scanner = REGION.getScanner(new Scan().setFilter(filter))) {
+      StoreScanner storeScanner =
+        (StoreScanner) (scanner).getStoreHeapForTesting().getCurrentForTesting();
+      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
+        if (kvs instanceof StoreFileScanner) {
+          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
+          // starting from pread so we use shared reader here.
+          assertTrue(sfScanner.getReader().shared);
+        }
+      }
+      List<Cell> cells = new ArrayList<>();
+      // should return before finishing the scan as we want to switch from pread to stream
+      assertTrue(scanner.next(cells,
+        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
+      assertTrue(cells.isEmpty());
+      scanner.shipped();
+
+      for (KeyValueScanner kvs : storeScanner.getAllScannersForTesting()) {
+        if (kvs instanceof StoreFileScanner) {
+          StoreFileScanner sfScanner = (StoreFileScanner) kvs;
+          // we should have convert to use stream read now.
+          assertFalse(sfScanner.getReader().shared);
+        }
+      }
+      assertFalse(scanner.next(cells,
+        ScannerContext.newBuilder().setTimeLimit(LimitScope.BETWEEN_CELLS, -1).build()));
+      Result result = Result.create(cells);
+      assertEquals(VALUE_PREFIX + 999, Bytes.toString(result.getValue(FAMILY, QUAL)));
+      cells.clear();
+      scanner.shipped();
+    }
+    // make sure all scanners are closed.
+    for (HStoreFile sf : REGION.getStore(FAMILY).getStorefiles()) {
+      assertFalse(sf.isReferencedInReads());
+    }
+  }
+
+  // We use a different logic to implement filterRowKey, where we will keep calling kvHeap.next
+  // until the row key is changed. And there we can only use NoLimitScannerContext so we can not
+  // make the upper layer return immediately. Simply do not use NoLimitScannerContext will lead to
+  // an infinite loop. Need to dig more, the code are way too complicated...
+  @Ignore
+  @Test
+  public void testFilterRowKey() throws IOException {
+    testFilter(new MatchLastRowKeyFilter());
+  }
+
+  public static final class MatchLastRowCellNextColFilter extends FilterBase {
+
+    @Override
+    public ReturnCode filterCell(Cell c) throws IOException {
+      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
+        return ReturnCode.INCLUDE;
+      } else {
+        return ReturnCode.NEXT_COL;
+      }
+    }
+  }
+
+  @Test
+  public void testFilterCellNextCol() throws IOException {
+    testFilter(new MatchLastRowCellNextColFilter());
+  }
+
+  public static final class MatchLastRowCellNextRowFilter extends FilterBase {
+
+    @Override
+    public ReturnCode filterCell(Cell c) throws IOException {
+      if (Bytes.toInt(c.getRowArray(), c.getRowOffset()) == 999) {
+        return ReturnCode.INCLUDE;
+      } else {
+        return ReturnCode.NEXT_ROW;
+      }
+    }
+  }
+
+  @Test
+  public void testFilterCellNextRow() throws IOException {
+    testFilter(new MatchLastRowCellNextRowFilter());
+  }
+
+  public static final class MatchLastRowFilterRowFilter extends FilterBase {
+
+    private boolean exclude;
+
+    @Override
+    public void filterRowCells(List<Cell> kvs) throws IOException {
+      Cell c = kvs.get(0);
+      exclude = Bytes.toInt(c.getRowArray(), c.getRowOffset()) != 999;
+    }
+
+    @Override
+    public void reset() throws IOException {
+      exclude = false;
+    }
+
+    @Override
+    public boolean filterRow() throws IOException {
+      return exclude;
+    }
+
+    @Override
+    public boolean hasFilterRow() {
+      return true;
+    }
+  }
+
+  @Test
+  public void testFilterRow() throws IOException {
+    testFilter(new MatchLastRowFilterRowFilter());
+  }
 }