You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by te...@apache.org on 2016/03/09 18:29:13 UTC

hbase git commit: HBASE-15378 Scanner cannot handle heartbeat message with no results (Phil Yang)

Repository: hbase
Updated Branches:
  refs/heads/master 9e967e5c1 -> ad9b91a90


HBASE-15378 Scanner cannot handle heartbeat message with no results (Phil Yang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ad9b91a9
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ad9b91a9
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ad9b91a9

Branch: refs/heads/master
Commit: ad9b91a9042607c4528ac79b2aed1254d99f6db4
Parents: 9e967e5
Author: tedyu <yu...@gmail.com>
Authored: Wed Mar 9 09:28:54 2016 -0800
Committer: tedyu <yu...@gmail.com>
Committed: Wed Mar 9 09:28:54 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/ClientScanner.java      | 61 +++++++++----------
 .../TestScannerHeartbeatMessages.java           | 64 +++++++++++++++++++-
 2 files changed, 91 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ad9b91a9/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 1658e5b..22a56e3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -381,9 +381,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
     Result[] values = null;
     long remainingResultSize = maxScannerResultSize;
     int countdown = this.caching;
-
-    // We need to reset it if it's a new callable that was created
-    // with a countdown in nextScanner
+    // We need to reset it if it's a new callable that was created with a countdown in nextScanner
     callable.setCaching(this.caching);
     // This flag is set when we want to skip the result returned. We do
     // this when we reset scanner because it split under us.
@@ -397,12 +395,11 @@ public abstract class ClientScanner extends AbstractClientScanner {
         // returns an empty array if scanning is to go on and we've just
         // exhausted current region.
         values = call(callable, caller, scannerTimeout);
-        // When the replica switch happens, we need to do certain operations
-        // again. The callable will openScanner with the right startkey
-        // but we need to pick up from there. Bypass the rest of the loop
-        // and let the catch-up happen in the beginning of the loop as it
-        // happens for the cases where we see exceptions. Since only openScanner
-        // would have happened, values would be null
+        // When the replica switch happens, we need to do certain operations again.
+        // The callable will openScanner with the right startkey but we need to pick up
+        // from there. Bypass the rest of the loop and let the catch-up happen in the beginning
+        // of the loop as it happens for the cases where we see exceptions.
+        // Since only openScanner would have happened, values would be null
         if (values == null && callable.switchedToADifferentReplica()) {
           // Any accumulated partial results are no longer valid since the callable will
           // openScanner with the correct startkey and we must pick up from there
@@ -415,7 +412,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
         // An exception was thrown which makes any partial results that we were collecting
         // invalid. The scanner will need to be reset to the beginning of a row.
         clearPartialResults();
-
         // DNRIOEs are thrown to make us break out of retries. Some types of DNRIOEs want us
         // to reset the scanner and come back in again.
         if (e instanceof UnknownScannerException) {
@@ -424,8 +420,8 @@ public abstract class ClientScanner extends AbstractClientScanner {
           // a ScannerTimeoutException. Else, it's because the region moved and we used the old
           // id against the new region server; reset the scanner.
           if (timeout < System.currentTimeMillis()) {
-            LOG.info("For hints related to the following exception, please try taking a look at: " +
-                "https://hbase.apache.org/book.html#trouble.client.scantimeout");
+            LOG.info("For hints related to the following exception, please try taking a look at: "
+                    + "https://hbase.apache.org/book.html#trouble.client.scantimeout");
             long elapsed = System.currentTimeMillis() - lastNext;
             ScannerTimeoutException ex =
                 new ScannerTimeoutException(elapsed + "ms passed since the last invocation, "
@@ -440,8 +436,7 @@ public abstract class ClientScanner extends AbstractClientScanner {
           if ((cause != null && cause instanceof NotServingRegionException) ||
               (cause != null && cause instanceof RegionServerStoppedException) ||
               e instanceof OutOfOrderScannerNextException) {
-            // Pass
-            // It is easier writing the if loop test as list of what is allowed rather than
+            // Pass. It is easier writing the if loop test as list of what is allowed rather than
             // as a list of what is not allowed... so if in here, it means we do not throw.
           } else {
             throw e;
@@ -449,11 +444,9 @@ public abstract class ClientScanner extends AbstractClientScanner {
         }
         // Else, its signal from depths of ScannerCallable that we need to reset the scanner.
         if (this.lastResult != null) {
-          // The region has moved. We need to open a brand new scanner at
-          // the new location.
-          // Reset the startRow to the row we've seen last so that the new
-          // scanner starts at the correct row. Otherwise we may see previously
-          // returned rows again.
+          // The region has moved. We need to open a brand new scanner at the new location.
+          // Reset the startRow to the row we've seen last so that the new scanner starts at
+          // the correct row. Otherwise we may see previously returned rows again.
           // (ScannerCallable by now has "relocated" the correct region)
           if (scan.isReversed()) {
             scan.setStartRow(createClosestRowBefore(lastResult.getRow()));
@@ -475,7 +468,6 @@ public abstract class ClientScanner extends AbstractClientScanner {
         // Set this to zero so we don't try and do an rpc and close on remote server when
         // the exception we got was UnknownScanner or the Server is going down.
         callable = null;
-
         // This continue will take us to while at end of loop where we will set up new scanner.
         continue;
       }
@@ -499,17 +491,19 @@ public abstract class ClientScanner extends AbstractClientScanner {
           this.lastResult = rs;
         }
       }
-
-      // Caller of this method just wants a Result. If we see a heartbeat message, it means
-      // processing of the scan is taking a long time server side. Rather than continue to
-      // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
-      // unnecesary delays to the caller
-      if (callable.isHeartbeatMessage() && cache.size() > 0) {
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("Heartbeat message received and cache contains Results."
-              + " Breaking out of scan loop");
+      if (callable.isHeartbeatMessage()) {
+        if (cache.size() > 0) {
+          // Caller of this method just wants a Result. If we see a heartbeat message, it means
+          // processing of the scan is taking a long time server side. Rather than continue to
+          // loop until a limit (e.g. size or caching) is reached, break out early to avoid causing
+          // unnecesary delays to the caller
+          if (LOG.isTraceEnabled()) {
+            LOG.trace("Heartbeat message received and cache contains Results."
+                    + " Breaking out of scan loop");
+          }
+          break;
         }
-        break;
+        continue;
       }
 
       // We expect that the server won't have more results for us when we exhaust
@@ -519,14 +513,15 @@ public abstract class ClientScanner extends AbstractClientScanner {
       if (null != values && values.length > 0 && callable.hasMoreResultsContext()) {
         // Only adhere to more server results when we don't have any partialResults
         // as it keeps the outer loop logic the same.
-        serverHasMoreResults = callable.getServerHasMoreResults() & partialResults.isEmpty();
+        serverHasMoreResults = callable.getServerHasMoreResults() && partialResults.isEmpty();
       }
       // Values == null means server-side filter has determined we must STOP
       // !partialResults.isEmpty() means that we are still accumulating partial Results for a
       // row. We should not change scanners before we receive all the partial Results for that
       // row.
-    } while (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
-        && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null)));
+    } while ((callable != null && callable.isHeartbeatMessage())
+        || (doneWithRegion(remainingResultSize, countdown, serverHasMoreResults)
+        && (!partialResults.isEmpty() || possiblyNextScanner(countdown, values == null))));
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/ad9b91a9/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
index 14c71d2..1935c0a 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -33,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.CoordinatedStateManager;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HConstants;
@@ -47,6 +49,8 @@ import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.client.ScannerCallable;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.filter.Filter;
+import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
 import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanResponse;
 import org.apache.hadoop.hbase.regionserver.HRegion.RegionScannerImpl;
@@ -86,7 +90,7 @@ public class TestScannerHeartbeatMessages {
    */
   private static TableName TABLE_NAME = TableName.valueOf("testScannerHeartbeatMessagesTable");
 
-  private static int NUM_ROWS = 10;
+  private static int NUM_ROWS = 5;
   private static byte[] ROW = Bytes.toBytes("testRow");
   private static byte[][] ROWS = HTestConst.makeNAscii(ROW, NUM_ROWS);
 
@@ -197,6 +201,7 @@ public class TestScannerHeartbeatMessages {
   public void testScannerHeartbeatMessages() throws Exception {
     testImportanceOfHeartbeats(testHeartbeatBetweenRows());
     testImportanceOfHeartbeats(testHeartbeatBetweenColumnFamilies());
+    testImportanceOfHeartbeats(testHeartbeatWithSparseFilter());
   }
 
   /**
@@ -275,6 +280,63 @@ public class TestScannerHeartbeatMessages {
     };
   }
 
+  public static class SparseFilter extends FilterBase{
+
+    @Override
+    public ReturnCode filterKeyValue(Cell v) throws IOException {
+      try {
+        Thread.sleep(SERVER_TIME_LIMIT + 10);
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+      }
+      return Bytes.equals(CellUtil.cloneRow(v), ROWS[NUM_ROWS - 1]) ?
+          ReturnCode.INCLUDE :
+          ReturnCode.SKIP;
+    }
+
+    public static Filter parseFrom(final byte [] pbBytes){
+      return new SparseFilter();
+    }
+  }
+
+  /**
+   * Test the case that there is a filter which filters most of cells
+   * @throws Exception
+   */
+  public Callable<Void> testHeartbeatWithSparseFilter() throws Exception {
+    return new Callable<Void>() {
+      @Override
+      public Void call() throws Exception {
+        Scan scan = new Scan();
+        scan.setMaxResultSize(Long.MAX_VALUE);
+        scan.setCaching(Integer.MAX_VALUE);
+        scan.setFilter(new SparseFilter());
+        ResultScanner scanner = TABLE.getScanner(scan);
+        int num = 0;
+        while (scanner.next() != null) {
+          num++;
+        }
+        assertEquals(1, num);
+        scanner.close();
+
+        scan = new Scan();
+        scan.setMaxResultSize(Long.MAX_VALUE);
+        scan.setCaching(Integer.MAX_VALUE);
+        scan.setFilter(new SparseFilter());
+        scan.setAllowPartialResults(true);
+        scanner = TABLE.getScanner(scan);
+        num = 0;
+        while (scanner.next() != null) {
+          num++;
+        }
+        assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
+        scanner.close();
+
+        return null;
+      }
+    };
+  }
+
   /**
    * Test the equivalence of a scan versus the same scan executed when heartbeat messages are
    * necessary