You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/05/23 14:31:22 UTC

[hbase] 23/27: HBASE-22036 Rewrite TestScannerHeartbeatMessages

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit d0b3a67304af71d879cdd538c4dfe28e58314350
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Thu Apr 25 18:18:58 2019 +0800

    HBASE-22036 Rewrite TestScannerHeartbeatMessages
---
 .../hbase/client/ScanPerNextResultScanner.java     | 147 +++++++++++++++++++++
 .../regionserver/TestScannerHeartbeatMessages.java |  71 +++++-----
 2 files changed, 187 insertions(+), 31 deletions(-)

diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ScanPerNextResultScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ScanPerNextResultScanner.java
new file mode 100644
index 0000000..c8665e9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ScanPerNextResultScanner.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+
+/**
+ * A ResultScanner which will only send request to RS when there are no cached results when calling
+ * next, just like the ResultScanner in the old time. Mainly used for writing UTs, that we can
+ * control when to send request to RS. The default ResultScanner implementation will fetch in
+ * background.
+ */
+@InterfaceAudience.Private
+public class ScanPerNextResultScanner implements ResultScanner, AdvancedScanResultConsumer {
+
+  private final AsyncTable<AdvancedScanResultConsumer> table;
+
+  private final Scan scan;
+
+  private final Queue<Result> queue = new ArrayDeque<>();
+
+  private ScanMetrics scanMetrics;
+
+  private boolean closed = false;
+
+  private Throwable error;
+
+  private ScanResumer resumer;
+
+  public ScanPerNextResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan) {
+    this.table = table;
+    this.scan = scan;
+  }
+
+  @Override
+  public synchronized void onError(Throwable error) {
+    this.error = error;
+    notifyAll();
+  }
+
+  @Override
+  public synchronized void onComplete() {
+    closed = true;
+    notifyAll();
+  }
+
+  @Override
+  public void onScanMetricsCreated(ScanMetrics scanMetrics) {
+    this.scanMetrics = scanMetrics;
+  }
+
+  @Override
+  public synchronized void onNext(Result[] results, ScanController controller) {
+    assert results.length > 0;
+    if (closed) {
+      controller.terminate();
+      return;
+    }
+    for (Result result : results) {
+      queue.add(result);
+    }
+    notifyAll();
+    resumer = controller.suspend();
+  }
+
+  @Override
+  public synchronized void onHeartbeat(ScanController controller) {
+    if (closed) {
+      controller.terminate();
+      return;
+    }
+    if (scan.isNeedCursorResult()) {
+      controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c)));
+    }
+  }
+
+  @Override
+  public synchronized Result next() throws IOException {
+    if (queue.isEmpty()) {
+      if (resumer != null) {
+        resumer.resume();
+        resumer = null;
+      } else {
+        table.scan(scan, this);
+      }
+    }
+    while (queue.isEmpty()) {
+      if (closed) {
+        return null;
+      }
+      if (error != null) {
+        Throwables.propagateIfPossible(error, IOException.class);
+        throw new IOException(error);
+      }
+      try {
+        wait();
+      } catch (InterruptedException e) {
+        throw new InterruptedIOException();
+      }
+    }
+    return queue.poll();
+  }
+
+  @Override
+  public synchronized void close() {
+    closed = true;
+    queue.clear();
+    if (resumer != null) {
+      resumer.resume();
+      resumer = null;
+    }
+    notifyAll();
+  }
+
+  @Override
+  public boolean renewLease() {
+    // The renew lease operation will be handled in background
+    return false;
+  }
+
+  @Override
+  public ScanMetrics getScanMetrics() {
+    return scanMetrics;
+  }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
index ea9f7e7..7a21941 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
@@ -39,11 +39,16 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTestConst;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScanPerNextResultScanner;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.client.TableDescriptor;
 import org.apache.hadoop.hbase.filter.Filter;
@@ -58,10 +63,10 @@ import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
 import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
 import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
 
@@ -75,11 +80,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRespon
  * the client when the server has exceeded the time limit during the processing of the scan. When
  * the time limit is reached, the server will return to the Client whatever Results it has
  * accumulated (potentially empty).
- * <p/>
- * TODO: with async client based sync client, we will fetch result in background which makes this
- * test broken. We need to find another way to implement the test.
  */
-@Ignore
 @Category(MediumTests.class)
 public class TestScannerHeartbeatMessages {
 
@@ -89,7 +90,7 @@ public class TestScannerHeartbeatMessages {
 
   private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
 
-  private static Table TABLE = null;
+  private static AsyncConnection CONN;
 
   /**
    * Table configuration
@@ -141,16 +142,19 @@ public class TestScannerHeartbeatMessages {
     conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
     TEST_UTIL.startMiniCluster(1);
 
-    TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
+    createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
+
+    Configuration newConf = new Configuration(conf);
+    newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
+    newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
+    CONN = ConnectionFactory.createAsyncConnection(newConf).get();
   }
 
-  static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
-      byte[][] qualifiers, byte[] cellValue) throws IOException {
+  static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers,
+      byte[] cellValue) throws IOException {
     Table ht = TEST_UTIL.createTable(name, families);
     List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
     ht.put(puts);
-    ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
-    return ht;
   }
 
   /**
@@ -177,6 +181,7 @@ public class TestScannerHeartbeatMessages {
 
   @AfterClass
   public static void tearDownAfterClass() throws Exception {
+    Closeables.close(CONN, true);
     TEST_UTIL.shutdownMiniCluster();
   }
 
@@ -311,26 +316,28 @@ public class TestScannerHeartbeatMessages {
         scan.setMaxResultSize(Long.MAX_VALUE);
         scan.setCaching(Integer.MAX_VALUE);
         scan.setFilter(new SparseCellFilter());
-        ResultScanner scanner = TABLE.getScanner(scan);
-        int num = 0;
-        while (scanner.next() != null) {
-          num++;
+        try (ScanPerNextResultScanner scanner =
+          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
+          int num = 0;
+          while (scanner.next() != null) {
+            num++;
+          }
+          assertEquals(1, num);
         }
-        assertEquals(1, num);
-        scanner.close();
 
         scan = new Scan();
         scan.setMaxResultSize(Long.MAX_VALUE);
         scan.setCaching(Integer.MAX_VALUE);
         scan.setFilter(new SparseCellFilter());
         scan.setAllowPartialResults(true);
-        scanner = TABLE.getScanner(scan);
-        num = 0;
-        while (scanner.next() != null) {
-          num++;
+        try (ScanPerNextResultScanner scanner =
+          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
+          int num = 0;
+          while (scanner.next() != null) {
+            num++;
+          }
+          assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
         }
-        assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
-        scanner.close();
 
         return null;
       }
@@ -349,13 +356,14 @@ public class TestScannerHeartbeatMessages {
         scan.setMaxResultSize(Long.MAX_VALUE);
         scan.setCaching(Integer.MAX_VALUE);
         scan.setFilter(new SparseRowFilter());
-        ResultScanner scanner = TABLE.getScanner(scan);
-        int num = 0;
-        while (scanner.next() != null) {
-          num++;
+        try (ScanPerNextResultScanner scanner =
+          new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
+          int num = 0;
+          while (scanner.next() != null) {
+            num++;
+          }
+          assertEquals(1, num);
         }
-        assertEquals(1, num);
-        scanner.close();
 
         return null;
       }
@@ -374,8 +382,9 @@ public class TestScannerHeartbeatMessages {
   private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
       int cfSleepTime, boolean sleepBeforeCf) throws Exception {
     disableSleeping();
-    final ResultScanner scanner = TABLE.getScanner(scan);
-    final ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan);
+    AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
+    final ResultScanner scanner = new ScanPerNextResultScanner(table, scan);
+    final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan);
 
     Result r1 = null;
     Result r2 = null;