You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/29 12:35:30 UTC

[03/50] [abbrv] hbase git commit: HBASE-19818 Scan time limit not work if the filter always filter row key

HBASE-19818 Scan time limit not work if the filter always filter row key


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c88e570d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c88e570d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c88e570d

Branch: refs/heads/HBASE-19397-branch-2
Commit: c88e570dc153a346e9e0ae9047bf6217a552928f
Parents: c16dae1
Author: Guanghao Zhang <zg...@apache.org>
Authored: Thu Jan 25 18:16:23 2018 +0800
Committer: Guanghao Zhang <zg...@apache.org>
Committed: Fri Jan 26 17:06:00 2018 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/HRegion.java      |  36 ++++--
 .../regionserver/NoLimitScannerContext.java     |  12 +-
 .../hbase/regionserver/ScannerContext.java      | 118 +++++++++++++++++--
 .../hadoop/hbase/regionserver/StoreScanner.java |   1 -
 .../hbase/client/TestRawAsyncScanCursor.java    |  10 +-
 .../TestScannerHeartbeatMessages.java           |  66 ++++++++---
 6 files changed, 187 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/c88e570d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index c0ccc1d..ae0f3d1 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -6390,7 +6390,9 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
       int initialBatchProgress = scannerContext.getBatchProgress();
       long initialSizeProgress = scannerContext.getDataSizeProgress();
       long initialHeapSizeProgress = scannerContext.getHeapSizeProgress();
-      long initialTimeProgress = scannerContext.getTimeProgress();
+
+      // Used to check time limit
+      LimitScope limitScope = LimitScope.BETWEEN_CELLS;
 
       // The loop here is used only when at some point during the next we determine
       // that due to effects of filters or otherwise, we have an empty row in the result.
@@ -6403,7 +6405,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
         if (scannerContext.getKeepProgress()) {
           // Progress should be kept. Reset to initial values seen at start of method invocation.
           scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
-              initialHeapSizeProgress, initialTimeProgress);
+              initialHeapSizeProgress);
         } else {
           scannerContext.clearProgress();
         }
@@ -6442,6 +6444,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
           }
           scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);
           scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);
+          limitScope = LimitScope.BETWEEN_ROWS;
+        }
+
+        if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
+          if (hasFilterRow) {
+            throw new IncompatibleFilterException(
+                "Filter whose hasFilterRow() returns true is incompatible with scans that must " +
+                    " stop mid-row because of a limit. ScannerContext:" + scannerContext);
+          }
+          return true;
         }
 
         // Check if we were getting data from the joinedHeap and hit the limit.
@@ -6472,6 +6484,11 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
               return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
             }
             results.clear();
+
+            // Read nothing as the rowkey was filtered, but still need to check time limit
+            if (scannerContext.checkTimeLimit(limitScope)) {
+              return true;
+            }
             continue;
           }
 
@@ -6498,16 +6515,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
             ret = filter.filterRowCellsWithRet(results);
 
             // We don't know how the results have changed after being filtered. Must set progress
-            // according to contents of results now. However, a change in the results should not
-            // affect the time progress. Thus preserve whatever time progress has been made
-            long timeProgress = scannerContext.getTimeProgress();
+            // according to contents of results now.
             if (scannerContext.getKeepProgress()) {
               scannerContext.setProgress(initialBatchProgress, initialSizeProgress,
-                  initialHeapSizeProgress, initialTimeProgress);
+                  initialHeapSizeProgress);
             } else {
               scannerContext.clearProgress();
             }
-            scannerContext.setTimeProgress(timeProgress);
             scannerContext.incrementBatchProgress(results.size());
             for (Cell cell : results) {
               scannerContext.incrementSizeProgress(PrivateCellUtil.estimatedSerializedSizeOf(cell),
@@ -6525,7 +6539,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
 
             // This row was totally filtered out, if this is NOT the last row,
             // we should continue on. Otherwise, nothing else to do.
-            if (!shouldStop) continue;
+            if (!shouldStop) {
+              // Read nothing as the cells was filtered, but still need to check time limit
+              if (scannerContext.checkTimeLimit(limitScope)) {
+                return true;
+              }
+              continue;
+            }
             return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();
           }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c88e570d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
index ef1fb68..26d8192 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/NoLimitScannerContext.java
@@ -68,17 +68,7 @@ public class NoLimitScannerContext extends ScannerContext {
   }
 
   @Override
-  void setTimeProgress(long timeProgress) {
-    // Do nothing. NoLimitScannerContext instances are immutable post-construction
-  }
-
-  @Override
-  void updateTimeProgress() {
-    // Do nothing. NoLimitScannerContext instances are immutable post-construction
-  }
-
-  @Override
-  void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) {
+  void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress) {
     // Do nothing. NoLimitScannerContext instances are immutable post-construction
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/c88e570d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
index 6b2267f..94d0811 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContext.java
@@ -51,11 +51,13 @@ import org.apache.hadoop.hbase.client.metrics.ServerSideScanMetrics;
 @InterfaceStability.Evolving
 public class ScannerContext {
 
+  LimitFields limits;
   /**
-   * Two sets of the same fields. One for the limits, another for the progress towards those limits
+   * A different set of progress fields. Only include batch, dataSize and heapSize. Compare to
+   * LimitFields, ProgressFields doesn't contain time field. As we save a deadline in LimitFields,
+   * so use {@link System#currentTimeMillis()} directly when check time limit.
    */
-  LimitFields limits;
-  LimitFields progress;
+  ProgressFields progress;
 
   /**
    * The state of the scanner after the invocation of {@link InternalScanner#next(java.util.List)}
@@ -104,10 +106,12 @@ public class ScannerContext {
 
   ScannerContext(boolean keepProgress, LimitFields limitsToCopy, boolean trackMetrics) {
     this.limits = new LimitFields();
-    if (limitsToCopy != null) this.limits.copy(limitsToCopy);
+    if (limitsToCopy != null) {
+      this.limits.copy(limitsToCopy);
+    }
 
     // Progress fields are initialized to 0
-    progress = new LimitFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0);
+    progress = new ProgressFields(0, 0, 0);
 
     this.keepProgress = keepProgress;
     this.scannerState = DEFAULT_STATE;
@@ -162,9 +166,10 @@ public class ScannerContext {
 
   /**
    * Update the time progress with {@link System#currentTimeMillis()}
+   * @deprecated will be removed in 3.0
    */
+  @Deprecated
   void updateTimeProgress() {
-    progress.setTime(System.currentTimeMillis());
   }
 
   int getBatchProgress() {
@@ -179,14 +184,25 @@ public class ScannerContext {
     return progress.getHeapSize();
   }
 
+  /**
+   * @deprecated will be removed in 3.0
+   */
+  @Deprecated
   long getTimeProgress() {
-    return progress.getTime();
+    return System.currentTimeMillis();
   }
 
+  /**
+   * @deprecated will be removed in 3.0
+   */
+  @Deprecated
   void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress, long timeProgress) {
+    setProgress(batchProgress, sizeProgress, heapSizeProgress);
+  }
+
+  void setProgress(int batchProgress, long sizeProgress, long heapSizeProgress) {
     setBatchProgress(batchProgress);
     setSizeProgress(sizeProgress, heapSizeProgress);
-    setTimeProgress(timeProgress);
   }
 
   void setSizeProgress(long dataSizeProgress, long heapSizeProgress) {
@@ -198,8 +214,11 @@ public class ScannerContext {
     progress.setBatch(batchProgress);
   }
 
+  /**
+   * @deprecated will be removed in 3.0
+   */
+  @Deprecated
   void setTimeProgress(long timeProgress) {
-    progress.setTime(timeProgress);
   }
 
   /**
@@ -207,7 +226,7 @@ public class ScannerContext {
    * values
    */
   void clearProgress() {
-    progress.setFields(0, LimitFields.DEFAULT_SCOPE, 0, 0, LimitFields.DEFAULT_SCOPE, 0);
+    progress.setFields(0, 0, 0);
   }
 
   /**
@@ -319,7 +338,7 @@ public class ScannerContext {
    * @return true when the limit is enforceable from the checker's scope and it has been reached
    */
   boolean checkTimeLimit(LimitScope checkerScope) {
-    return hasTimeLimit(checkerScope) && progress.getTime() >= limits.getTime();
+    return hasTimeLimit(checkerScope) && (System.currentTimeMillis() >= limits.getTime());
   }
 
   /**
@@ -690,4 +709,81 @@ public class ScannerContext {
       return sb.toString();
     }
   }
+
+  private static class ProgressFields {
+
+    private static int DEFAULT_BATCH = -1;
+    private static long DEFAULT_SIZE = -1L;
+
+    // The batch limit will always be enforced between cells, thus, there isn't a field to hold the
+    // batch scope
+    int batch = DEFAULT_BATCH;
+
+    // The sum of cell data sizes(key + value). The Cell data might be in on heap or off heap area.
+    long dataSize = DEFAULT_SIZE;
+    // The sum of heap space occupied by all tracked cells. This includes Cell POJO's overhead as
+    // such AND data cells of Cells which are in on heap area.
+    long heapSize = DEFAULT_SIZE;
+
+    /**
+     * Fields keep their default values.
+     */
+    ProgressFields() {
+    }
+
+    ProgressFields(int batch, long size, long heapSize) {
+      setFields(batch, size, heapSize);
+    }
+
+    /**
+     * Set all fields together.
+     */
+    void setFields(int batch, long dataSize, long heapSize) {
+      setBatch(batch);
+      setDataSize(dataSize);
+      setHeapSize(heapSize);
+    }
+
+    int getBatch() {
+      return this.batch;
+    }
+
+    void setBatch(int batch) {
+      this.batch = batch;
+    }
+
+    long getDataSize() {
+      return this.dataSize;
+    }
+
+    long getHeapSize() {
+      return this.heapSize;
+    }
+
+    void setDataSize(long dataSize) {
+      this.dataSize = dataSize;
+    }
+
+    void setHeapSize(long heapSize) {
+      this.heapSize = heapSize;
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("{");
+
+      sb.append("batch:");
+      sb.append(batch);
+
+      sb.append(", dataSize:");
+      sb.append(dataSize);
+
+      sb.append(", heapSize:");
+      sb.append(heapSize);
+
+      sb.append("}");
+      return sb.toString();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c88e570d/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
index 7e682f9..0b9b547 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/StoreScanner.java
@@ -552,7 +552,6 @@ public class StoreScanner extends NonReversedNonLazyKeyValueScanner
     LOOP: do {
       // Update and check the time limit based on the configured value of cellsPerTimeoutCheck
       if ((kvsScanned % cellsPerHeartbeatCheck == 0)) {
-        scannerContext.updateTimeProgress();
         if (scannerContext.checkTimeLimit(LimitScope.BETWEEN_CELLS)) {
           return scannerContext.setScannerState(NextState.TIME_LIMIT_REACHED).hasMoreValues();
         }

http://git-wip-us.apache.org/repos/asf/hbase/blob/c88e570d/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
index 0624a30..04ffd7b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestRawAsyncScanCursor.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.client;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
@@ -79,12 +80,9 @@ public class TestRawAsyncScanCursor extends AbstractTestScanCursor {
             assertEquals(1, results.length);
             assertEquals(NUM_ROWS - 1, count / NUM_FAMILIES / NUM_QUALIFIERS);
             // we will always provide a scan cursor if time limit is reached.
-            if (count == NUM_ROWS * NUM_FAMILIES * NUM_QUALIFIERS - 1) {
-              assertFalse(controller.cursor().isPresent());
-            } else {
-              assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1],
-                controller.cursor().get().getRow());
-            }
+            assertTrue(controller.cursor().isPresent());
+            assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1],
+              controller.cursor().get().getRow());
             assertArrayEquals(ROWS[reversed ? 0 : NUM_ROWS - 1], results[0].getRow());
             count++;
           } catch (Throwable e) {

http://git-wip-us.apache.org/repos/asf/hbase/blob/c88e570d/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
index fb02d9d..11cc365 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
@@ -144,12 +144,6 @@ public class TestScannerHeartbeatMessages {
 
   /**
    * Make puts to put the input value into each combination of row, family, and qualifier
-   * @param rows
-   * @param families
-   * @param qualifiers
-   * @param value
-   * @return
-   * @throws IOException
    */
   static ArrayList<Put> createPuts(byte[][] rows, byte[][] families, byte[][] qualifiers,
       byte[] value) throws IOException {
@@ -189,8 +183,6 @@ public class TestScannerHeartbeatMessages {
    * Run the test callable when heartbeats are enabled/disabled. We expect all tests to only pass
    * when heartbeat messages are enabled (otherwise the test is pointless). When heartbeats are
    * disabled, the test should throw an exception.
-   * @param testCallable
-   * @throws InterruptedException
    */
   private void testImportanceOfHeartbeats(Callable<Void> testCallable) throws InterruptedException {
     HeartbeatRPCServices.heartbeatsEnabled = true;
@@ -217,7 +209,6 @@ public class TestScannerHeartbeatMessages {
   /**
    * Test the case that the time limit for the scan is reached after each full row of cells is
    * fetched.
-   * @throws Exception
    */
   @Test
   public void testHeartbeatBetweenRows() throws Exception {
@@ -239,7 +230,6 @@ public class TestScannerHeartbeatMessages {
 
   /**
    * Test the case that the time limit for scans is reached in between column families
-   * @throws Exception
    */
   @Test
   public void testHeartbeatBetweenColumnFamilies() throws Exception {
@@ -263,7 +253,7 @@ public class TestScannerHeartbeatMessages {
     });
   }
 
-  public static class SparseFilter extends FilterBase {
+  public static class SparseCellFilter extends FilterBase {
 
     @Override
     public ReturnCode filterCell(final Cell v) throws IOException {
@@ -277,23 +267,39 @@ public class TestScannerHeartbeatMessages {
     }
 
     public static Filter parseFrom(final byte[] pbBytes) {
-      return new SparseFilter();
+      return new SparseCellFilter();
+    }
+  }
+
+  public static class SparseRowFilter extends FilterBase {
+
+    @Override
+    public boolean filterRowKey(Cell cell) throws IOException {
+      try {
+        Thread.sleep(CLIENT_TIMEOUT / 2 - 100);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      return !Bytes.equals(CellUtil.cloneRow(cell), ROWS[NUM_ROWS - 1]);
+    }
+
+    public static Filter parseFrom(final byte[] pbBytes) {
+      return new SparseRowFilter();
     }
   }
 
   /**
    * Test the case that there is a filter which filters most of cells
-   * @throws Exception
    */
   @Test
-  public void testHeartbeatWithSparseFilter() throws Exception {
+  public void testHeartbeatWithSparseCellFilter() throws Exception {
     testImportanceOfHeartbeats(new Callable<Void>() {
       @Override
       public Void call() throws Exception {
         Scan scan = new Scan();
         scan.setMaxResultSize(Long.MAX_VALUE);
         scan.setCaching(Integer.MAX_VALUE);
-        scan.setFilter(new SparseFilter());
+        scan.setFilter(new SparseCellFilter());
         ResultScanner scanner = TABLE.getScanner(scan);
         int num = 0;
         while (scanner.next() != null) {
@@ -305,7 +311,7 @@ public class TestScannerHeartbeatMessages {
         scan = new Scan();
         scan.setMaxResultSize(Long.MAX_VALUE);
         scan.setCaching(Integer.MAX_VALUE);
-        scan.setFilter(new SparseFilter());
+        scan.setFilter(new SparseCellFilter());
         scan.setAllowPartialResults(true);
         scanner = TABLE.getScanner(scan);
         num = 0;
@@ -321,6 +327,31 @@ public class TestScannerHeartbeatMessages {
   }
 
   /**
+   * Test the case that there is a filter which filters most of rows
+   */
+  @Test
+  public void testHeartbeatWithSparseRowFilter() throws Exception {
+    testImportanceOfHeartbeats(new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        Scan scan = new Scan();
+        scan.setMaxResultSize(Long.MAX_VALUE);
+        scan.setCaching(Integer.MAX_VALUE);
+        scan.setFilter(new SparseRowFilter());
+        ResultScanner scanner = TABLE.getScanner(scan);
+        int num = 0;
+        while (scanner.next() != null) {
+          num++;
+        }
+        assertEquals(1, num);
+        scanner.close();
+
+        return null;
+      }
+    });
+  }
+
+  /**
    * Test the equivalence of a scan versus the same scan executed when heartbeat messages are
    * necessary
    * @param scan The scan configuration being tested
@@ -328,7 +359,6 @@ public class TestScannerHeartbeatMessages {
    * @param cfSleepTime The time to sleep between fetches of column family cells
    * @param sleepBeforeCf set to true when column family sleeps should occur before the cells for
    *          that column family are fetched
-   * @throws Exception
    */
   private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
       int cfSleepTime, boolean sleepBeforeCf) throws Exception {
@@ -361,8 +391,6 @@ public class TestScannerHeartbeatMessages {
   /**
    * Helper method for setting the time to sleep between rows and column families. If a sleep time
    * is negative then that sleep will be disabled
-   * @param rowSleepTime
-   * @param cfSleepTime
    */
   private static void configureSleepTime(int rowSleepTime, int cfSleepTime, boolean sleepBeforeCf) {
     HeartbeatHRegion.sleepBetweenRows = rowSleepTime > 0;