You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/02/20 03:16:30 UTC

[hbase] branch branch-2 updated: HBASE-21930 Deal with ScannerResetException when opening region scanner

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 4b36de3  HBASE-21930 Deal with ScannerResetException when opening region scanner
4b36de3 is described below

commit 4b36de3161fd5c891d82027334ace550808ebbd6
Author: zhangduo <zh...@apache.org>
AuthorDate: Tue Feb 19 22:08:14 2019 +0800

    HBASE-21930 Deal with ScannerResetException when opening region scanner
    
    Signed-off-by: Zheng Hu <op...@gmail.com>
---
 .../hbase/client/AsyncRpcRetryingCaller.java       |   9 +-
 .../AsyncScanSingleRegionRpcRetryingCaller.java    |  11 +-
 .../hbase/client/TestAsyncTableScanException.java  | 179 +++++++++++++++++++++
 3 files changed, 193 insertions(+), 6 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
index 45266e9..387b103 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRpcRetryingCaller.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hbase.NotServingRegionException;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotEnabledException;
 import org.apache.hadoop.hbase.TableNotFoundException;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
 import org.apache.hadoop.hbase.ipc.HBaseRpcController;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.FutureUtils;
@@ -151,7 +152,13 @@ public abstract class AsyncRpcRetryingCaller<T> {
       return;
     }
     Throwable error = translateException(t);
-    if (error instanceof DoNotRetryIOException) {
+    // We use this retrying caller to open a scanner, as it is idempotent, but we may throw
+    // ScannerResetException, which is a DoNotRetryIOException when opening a scanner as now we will
+    // also fetch data when opening a scanner. The intention here is that if we hit a
+    // ScannerResetException when scanning then we should try to open a new scanner, instead of
+    // retrying on the old one, so it is declared as a DoNotRetryIOException. But here we are
+    // exactly trying to open a new scanner, so we should retry on ScannerResetException.
+    if (error instanceof DoNotRetryIOException && !(error instanceof ScannerResetException)) {
       future.completeExceptionally(error);
       return;
     }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
index 96961af..ab37b5d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncScanSingleRegionRpcRetryingCaller.java
@@ -393,11 +393,12 @@ class AsyncScanSingleRegionRpcRetryingCaller {
           " ms",
         error);
     }
-    boolean scannerClosed = error instanceof UnknownScannerException ||
-        error instanceof NotServingRegionException || error instanceof RegionServerStoppedException;
+    boolean scannerClosed =
+      error instanceof UnknownScannerException || error instanceof NotServingRegionException ||
+        error instanceof RegionServerStoppedException || error instanceof ScannerResetException;
     RetriesExhaustedException.ThrowableWithExtraContext qt =
-        new RetriesExhaustedException.ThrowableWithExtraContext(error,
-            EnvironmentEdgeManager.currentTime(), "");
+      new RetriesExhaustedException.ThrowableWithExtraContext(error,
+        EnvironmentEdgeManager.currentTime(), "");
     exceptions.add(qt);
     if (tries >= maxAttempts) {
       completeExceptionally(!scannerClosed);
@@ -418,7 +419,7 @@ class AsyncScanSingleRegionRpcRetryingCaller {
       completeWhenError(false);
       return;
     }
-    if (error instanceof OutOfOrderScannerNextException || error instanceof ScannerResetException) {
+    if (error instanceof OutOfOrderScannerNextException) {
       completeWhenError(true);
       return;
     }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanException.java
new file mode 100644
index 0000000..a1715cc
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncTableScanException.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.hamcrest.CoreMatchers.instanceOf;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.exceptions.ScannerResetException;
+import org.apache.hadoop.hbase.regionserver.InternalScanner;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
+
+@Category({ MediumTests.class, ClientTests.class })
+public class TestAsyncTableScanException {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestAsyncTableScanException.class);
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static TableName TABLE_NAME = TableName.valueOf("scan");
+
+  private static byte[] FAMILY = Bytes.toBytes("family");
+
+  private static byte[] QUAL = Bytes.toBytes("qual");
+
+  private static AsyncConnection CONN;
+
+  private static AtomicInteger REQ_COUNT = new AtomicInteger();
+
+  private static volatile int ERROR_AT;
+
+  private static volatile boolean ERROR;
+
+  private static volatile boolean DO_NOT_RETRY;
+
+  public static final class ErrorCP implements RegionObserver, RegionCoprocessor {
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public boolean postScannerNext(ObserverContext<RegionCoprocessorEnvironment> c,
+        InternalScanner s, List<Result> result, int limit, boolean hasNext) throws IOException {
+      REQ_COUNT.incrementAndGet();
+      if ((ERROR_AT == REQ_COUNT.get()) || ERROR) {
+        if (DO_NOT_RETRY) {
+          throw new DoNotRetryIOException("Injected exception");
+        } else {
+          throw new IOException("Injected exception");
+        }
+      }
+      return RegionObserver.super.postScannerNext(c, s, result, limit, hasNext);
+    }
+
+  }
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    UTIL.startMiniCluster(3);
+    UTIL.getAdmin()
+      .createTable(TableDescriptorBuilder.newBuilder(TABLE_NAME)
+        .setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY))
+        .setCoprocessor(ErrorCP.class.getName()).build());
+    try (Table table = UTIL.getConnection().getTable(TABLE_NAME)) {
+      for (int i = 0; i < 100; i++) {
+        table.put(new Put(Bytes.toBytes(i)).addColumn(FAMILY, QUAL, Bytes.toBytes(i)));
+      }
+    }
+    CONN = ConnectionFactory.createAsyncConnection(UTIL.getConfiguration()).get();
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    Closeables.close(CONN, true);
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Before
+  public void setUpBeforeTest() {
+    REQ_COUNT.set(0);
+    ERROR_AT = 0;
+    ERROR = false;
+    DO_NOT_RETRY = false;
+  }
+
+  @Test(expected = DoNotRetryIOException.class)
+  public void testDoNotRetryIOException() throws IOException {
+    ERROR_AT = 1;
+    DO_NOT_RETRY = true;
+    try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(FAMILY)) {
+      scanner.next();
+    }
+  }
+
+  @Test
+  public void testIOException() throws IOException {
+    ERROR = true;
+    try (ResultScanner scanner =
+      CONN.getTableBuilder(TABLE_NAME).setMaxAttempts(3).build().getScanner(FAMILY)) {
+      scanner.next();
+      fail();
+    } catch (RetriesExhaustedException e) {
+      // expected
+      assertThat(e.getCause(), instanceOf(ScannerResetException.class));
+    }
+    assertTrue(REQ_COUNT.get() >= 3);
+  }
+
+  private void count() throws IOException {
+    try (ResultScanner scanner = CONN.getTable(TABLE_NAME).getScanner(new Scan().setCaching(1))) {
+      for (int i = 0; i < 100; i++) {
+        Result result = scanner.next();
+        assertArrayEquals(Bytes.toBytes(i), result.getRow());
+        assertArrayEquals(Bytes.toBytes(i), result.getValue(FAMILY, QUAL));
+      }
+    }
+  }
+
+  @Test
+  public void testRecoveryFromScannerResetWhileOpening() throws IOException {
+    ERROR_AT = 1;
+    count();
+    // we should at least request 1 time otherwise the error will not be triggered, and then we
+    // need at least one more request to get the remaining results.
+    assertTrue(REQ_COUNT.get() >= 2);
+  }
+
+  @Test
+  public void testRecoveryFromScannerResetInTheMiddle() throws IOException {
+    ERROR_AT = 2;
+    count();
+    // we should at least request 2 times otherwise the error will not be triggered, and then we
+    // need at least one more request to get the remaining results.
+    assertTrue(REQ_COUNT.get() >= 3);
+  }
+}