You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/05/02 09:05:35 UTC
[hbase] 23/25: HBASE-22036 Rewrite TestScannerHeartbeatMessages
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit ad03167b73e8382f81c4c5b9512fcc52f6ba3604
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Thu Apr 25 18:18:58 2019 +0800
HBASE-22036 Rewrite TestScannerHeartbeatMessages
---
.../hbase/client/ScanPerNextResultScanner.java | 147 +++++++++++++++++++++
.../regionserver/TestScannerHeartbeatMessages.java | 71 +++++-----
2 files changed, 187 insertions(+), 31 deletions(-)
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ScanPerNextResultScanner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ScanPerNextResultScanner.java
new file mode 100644
index 0000000..c8665e9
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/ScanPerNextResultScanner.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Throwables;
+
+/**
+ * A ResultScanner which will only send request to RS when there are no cached results when calling
+ * next, just like the ResultScanner in the old time. Mainly used for writing UTs, that we can
+ * control when to send request to RS. The default ResultScanner implementation will fetch in
+ * background.
+ */
+@InterfaceAudience.Private
+public class ScanPerNextResultScanner implements ResultScanner, AdvancedScanResultConsumer {
+
+ private final AsyncTable<AdvancedScanResultConsumer> table;
+
+ private final Scan scan;
+
+ private final Queue<Result> queue = new ArrayDeque<>();
+
+ private ScanMetrics scanMetrics;
+
+ private boolean closed = false;
+
+ private Throwable error;
+
+ private ScanResumer resumer;
+
+ public ScanPerNextResultScanner(AsyncTable<AdvancedScanResultConsumer> table, Scan scan) {
+ this.table = table;
+ this.scan = scan;
+ }
+
+ @Override
+ public synchronized void onError(Throwable error) {
+ this.error = error;
+ notifyAll();
+ }
+
+ @Override
+ public synchronized void onComplete() {
+ closed = true;
+ notifyAll();
+ }
+
+ @Override
+ public void onScanMetricsCreated(ScanMetrics scanMetrics) {
+ this.scanMetrics = scanMetrics;
+ }
+
+ @Override
+ public synchronized void onNext(Result[] results, ScanController controller) {
+ assert results.length > 0;
+ if (closed) {
+ controller.terminate();
+ return;
+ }
+ for (Result result : results) {
+ queue.add(result);
+ }
+ notifyAll();
+ resumer = controller.suspend();
+ }
+
+ @Override
+ public synchronized void onHeartbeat(ScanController controller) {
+ if (closed) {
+ controller.terminate();
+ return;
+ }
+ if (scan.isNeedCursorResult()) {
+ controller.cursor().ifPresent(c -> queue.add(Result.createCursorResult(c)));
+ }
+ }
+
+ @Override
+ public synchronized Result next() throws IOException {
+ if (queue.isEmpty()) {
+ if (resumer != null) {
+ resumer.resume();
+ resumer = null;
+ } else {
+ table.scan(scan, this);
+ }
+ }
+ while (queue.isEmpty()) {
+ if (closed) {
+ return null;
+ }
+ if (error != null) {
+ Throwables.propagateIfPossible(error, IOException.class);
+ throw new IOException(error);
+ }
+ try {
+ wait();
+ } catch (InterruptedException e) {
+ throw new InterruptedIOException();
+ }
+ }
+ return queue.poll();
+ }
+
+ @Override
+ public synchronized void close() {
+ closed = true;
+ queue.clear();
+ if (resumer != null) {
+ resumer.resume();
+ resumer = null;
+ }
+ notifyAll();
+ }
+
+ @Override
+ public boolean renewLease() {
+ // The renew lease operation will be handled in background
+ return false;
+ }
+
+ @Override
+ public ScanMetrics getScanMetrics() {
+ return scanMetrics;
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
index ea9f7e7..7a21941 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerHeartbeatMessages.java
@@ -39,11 +39,16 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTestConst;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.AdvancedScanResultConsumer;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScanPerNextResultScanner;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.filter.Filter;
@@ -58,10 +63,10 @@ import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
-import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
@@ -75,11 +80,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRespon
* the client when the server has exceeded the time limit during the processing of the scan. When
* the time limit is reached, the server will return to the Client whatever Results it has
* accumulated (potentially empty).
- * <p/>
- * TODO: with async client based sync client, we will fetch result in background which makes this
- * test broken. We need to find another way to implement the test.
*/
-@Ignore
@Category(MediumTests.class)
public class TestScannerHeartbeatMessages {
@@ -89,7 +90,7 @@ public class TestScannerHeartbeatMessages {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
- private static Table TABLE = null;
+ private static AsyncConnection CONN;
/**
* Table configuration
@@ -141,16 +142,19 @@ public class TestScannerHeartbeatMessages {
conf.setLong(StoreScanner.HBASE_CELLS_SCANNED_PER_HEARTBEAT_CHECK, 1);
TEST_UTIL.startMiniCluster(1);
- TABLE = createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
+ createTestTable(TABLE_NAME, ROWS, FAMILIES, QUALIFIERS, VALUE);
+
+ Configuration newConf = new Configuration(conf);
+ newConf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
+ newConf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, CLIENT_TIMEOUT);
+ CONN = ConnectionFactory.createAsyncConnection(newConf).get();
}
- static Table createTestTable(TableName name, byte[][] rows, byte[][] families,
- byte[][] qualifiers, byte[] cellValue) throws IOException {
+ static void createTestTable(TableName name, byte[][] rows, byte[][] families, byte[][] qualifiers,
+ byte[] cellValue) throws IOException {
Table ht = TEST_UTIL.createTable(name, families);
List<Put> puts = createPuts(rows, families, qualifiers, cellValue);
ht.put(puts);
- ht.getConfiguration().setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, CLIENT_TIMEOUT);
- return ht;
}
/**
@@ -177,6 +181,7 @@ public class TestScannerHeartbeatMessages {
@AfterClass
public static void tearDownAfterClass() throws Exception {
+ Closeables.close(CONN, true);
TEST_UTIL.shutdownMiniCluster();
}
@@ -311,26 +316,28 @@ public class TestScannerHeartbeatMessages {
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseCellFilter());
- ResultScanner scanner = TABLE.getScanner(scan);
- int num = 0;
- while (scanner.next() != null) {
- num++;
+ try (ScanPerNextResultScanner scanner =
+ new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
+ int num = 0;
+ while (scanner.next() != null) {
+ num++;
+ }
+ assertEquals(1, num);
}
- assertEquals(1, num);
- scanner.close();
scan = new Scan();
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseCellFilter());
scan.setAllowPartialResults(true);
- scanner = TABLE.getScanner(scan);
- num = 0;
- while (scanner.next() != null) {
- num++;
+ try (ScanPerNextResultScanner scanner =
+ new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
+ int num = 0;
+ while (scanner.next() != null) {
+ num++;
+ }
+ assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
}
- assertEquals(NUM_FAMILIES * NUM_QUALIFIERS, num);
- scanner.close();
return null;
}
@@ -349,13 +356,14 @@ public class TestScannerHeartbeatMessages {
scan.setMaxResultSize(Long.MAX_VALUE);
scan.setCaching(Integer.MAX_VALUE);
scan.setFilter(new SparseRowFilter());
- ResultScanner scanner = TABLE.getScanner(scan);
- int num = 0;
- while (scanner.next() != null) {
- num++;
+ try (ScanPerNextResultScanner scanner =
+ new ScanPerNextResultScanner(CONN.getTable(TABLE_NAME), scan)) {
+ int num = 0;
+ while (scanner.next() != null) {
+ num++;
+ }
+ assertEquals(1, num);
}
- assertEquals(1, num);
- scanner.close();
return null;
}
@@ -374,8 +382,9 @@ public class TestScannerHeartbeatMessages {
private void testEquivalenceOfScanWithHeartbeats(final Scan scan, int rowSleepTime,
int cfSleepTime, boolean sleepBeforeCf) throws Exception {
disableSleeping();
- final ResultScanner scanner = TABLE.getScanner(scan);
- final ResultScanner scannerWithHeartbeats = TABLE.getScanner(scan);
+ AsyncTable<AdvancedScanResultConsumer> table = CONN.getTable(TABLE_NAME);
+ final ResultScanner scanner = new ScanPerNextResultScanner(table, scan);
+ final ResultScanner scannerWithHeartbeats = new ScanPerNextResultScanner(table, scan);
Result r1 = null;
Result r2 = null;