You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by dd...@apache.org on 2014/12/23 01:22:23 UTC
hbase git commit: HBASE-12012. Improve cancellation for the scan RPCs
Repository: hbase
Updated Branches:
refs/heads/master 83db450fc -> 1c45d1cd9
HBASE-12012. Improve cancellation for the scan RPCs
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1c45d1cd
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1c45d1cd
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1c45d1cd
Branch: refs/heads/master
Commit: 1c45d1cd9d6c585d0e169769ce400034686cf179
Parents: 83db450
Author: Devaraj Das <dd...@apache.org>
Authored: Mon Dec 22 16:22:04 2014 -0800
Committer: Devaraj Das <dd...@apache.org>
Committed: Mon Dec 22 16:22:04 2014 -0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/client/Cancellable.java | 31 ++++
.../hadoop/hbase/client/ClientScanner.java | 7 +
.../hadoop/hbase/client/ClientSmallScanner.java | 7 +-
.../client/ResultBoundedCompletionService.java | 165 +++++++++++++++++++
.../RpcRetryingCallerWithReadReplicas.java | 154 ++---------------
.../hadoop/hbase/client/ScannerCallable.java | 7 +-
.../client/ScannerCallableWithReplicas.java | 105 +++++++++---
.../hadoop/hbase/client/TestReplicasClient.java | 48 ++++++
8 files changed, 356 insertions(+), 168 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c45d1cd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cancellable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cancellable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cancellable.java
new file mode 100644
index 0000000..43011e9
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Cancellable.java
@@ -0,0 +1,31 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This should be implemented by the Get/Scan implementations that
+ * talk to replica regions. When an RPC response is received from one
+ * of the replicas, the RPCs to the other replicas are cancelled.
+ */
+@InterfaceAudience.Private
+interface Cancellable {
+ public void cancel();
+ public boolean isCancelled();
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c45d1cd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
index 05202c8..3f1ba84 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientScanner.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.protobuf.generated.MapReduceProtos;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.Bytes;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Implements the scanner interface for the HBase client.
* If there are multiple regions in a table, this scanner will iterate
@@ -275,6 +277,11 @@ public class ClientScanner extends AbstractClientScanner {
return true;
}
+ @VisibleForTesting
+ boolean isAnyRPCcancelled() {
+ return callable.isAnyRPCcancelled();
+ }
+
static Result[] call(Scan scan, ScannerCallableWithReplicas callable,
RpcRetryingCaller<Result[]> caller, int scannerTimeout)
throws IOException, RuntimeException {
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c45d1cd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
index 90fb1c2..9fc9cc6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClientSmallScanner.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
-import org.apache.hadoop.hbase.ipc.PayloadCarryingRpcController;
import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.RequestConverter;
@@ -169,7 +168,7 @@ public class ClientSmallScanner extends ClientScanner {
ScanRequest request = RequestConverter.buildScanRequest(getLocation()
.getRegionInfo().getRegionName(), getScan(), getCaching(), true);
ScanResponse response = null;
- PayloadCarryingRpcController controller = controllerFactory.newController();
+ controller = controllerFactory.newController();
try {
controller.setPriority(getTableName());
controller.setCallTimeout(timeout);
@@ -183,8 +182,8 @@ public class ClientSmallScanner extends ClientScanner {
@Override
public ScannerCallable getScannerCallableForReplica(int id) {
- return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(), scanMetrics,
- controllerFactory, getCaching(), id);
+ return new SmallScannerCallable((ClusterConnection)connection, tableName, getScan(),
+ scanMetrics, controllerFactory, getCaching(), id);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c45d1cd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
new file mode 100644
index 0000000..1dab776
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ResultBoundedCompletionService.java
@@ -0,0 +1,165 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executor;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.htrace.Trace;
+
+/**
+ * A completion service for the RpcRetryingCallerFactory.
+ * Keeps the list of the futures, and allows to cancel them all.
+ * This means as well that it can be used for a small set of tasks only.
+ * <br>Implementation is not Thread safe.
+ */
+@InterfaceAudience.Private
+public class ResultBoundedCompletionService<V> {
+ private final RpcRetryingCallerFactory retryingCallerFactory;
+ private final Executor executor;
+ private final QueueingFuture<V>[] tasks; // all the tasks
+ private volatile QueueingFuture<V> completed = null;
+
+ class QueueingFuture<T> implements RunnableFuture<T> {
+ private final RetryingCallable<T> future;
+ private T result = null;
+ private ExecutionException exeEx = null;
+ private volatile boolean cancelled;
+ private final int callTimeout;
+ private final RpcRetryingCaller<T> retryingCaller;
+ private boolean resultObtained = false;
+
+
+ public QueueingFuture(RetryingCallable<T> future, int callTimeout) {
+ this.future = future;
+ this.callTimeout = callTimeout;
+ this.retryingCaller = retryingCallerFactory.<T>newCaller();
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void run() {
+ try {
+ if (!cancelled) {
+ result =
+ this.retryingCaller.callWithRetries(future, callTimeout);
+ resultObtained = true;
+ }
+ } catch (Throwable t) {
+ exeEx = new ExecutionException(t);
+ } finally {
+ if (!cancelled && completed == null) {
+ completed = (QueueingFuture<V>) QueueingFuture.this;
+ synchronized (tasks) {
+ tasks.notify();
+ }
+ }
+ }
+ }
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (resultObtained || exeEx != null) return false;
+ retryingCaller.cancel();
+ if (future instanceof Cancellable) ((Cancellable)future).cancel();
+ cancelled = true;
+ return true;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled;
+ }
+
+ @Override
+ public boolean isDone() {
+ return resultObtained || exeEx != null;
+ }
+
+ @Override
+ public T get() throws InterruptedException, ExecutionException {
+ try {
+ return get(1000, TimeUnit.DAYS);
+ } catch (TimeoutException e) {
+ throw new RuntimeException("You did wait for 1000 days here?", e);
+ }
+ }
+
+ @Override
+ public T get(long timeout, TimeUnit unit)
+ throws InterruptedException, ExecutionException, TimeoutException {
+ synchronized (tasks) {
+ if (resultObtained) {
+ return result;
+ }
+ if (exeEx != null) {
+ throw exeEx;
+ }
+ unit.timedWait(tasks, timeout);
+ }
+ if (resultObtained) {
+ return result;
+ }
+ if (exeEx != null) {
+ throw exeEx;
+ }
+
+ throw new TimeoutException("timeout=" + timeout + ", " + unit);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ public ResultBoundedCompletionService(
+ RpcRetryingCallerFactory retryingCallerFactory, Executor executor,
+ int maxTasks) {
+ this.retryingCallerFactory = retryingCallerFactory;
+ this.executor = executor;
+ this.tasks = new QueueingFuture[maxTasks];
+ }
+
+
+ public void submit(RetryingCallable<V> task, int callTimeout, int id) {
+ QueueingFuture<V> newFuture = new QueueingFuture<V>(task, callTimeout);
+ executor.execute(Trace.wrap(newFuture));
+ tasks[id] = newFuture;
+ }
+
+ public QueueingFuture<V> take() throws InterruptedException {
+ synchronized (tasks) {
+ while (completed == null) tasks.wait();
+ }
+ return completed;
+ }
+
+ public QueueingFuture<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
+ synchronized (tasks) {
+ if (completed == null) unit.timedWait(tasks, timeout);
+ }
+ return completed;
+ }
+
+ public void cancelAll() {
+ for (QueueingFuture<V> future : tasks) {
+ if (future != null) future.cancel(true);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c45d1cd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
index 57accce..273a1e1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/RpcRetryingCallerWithReadReplicas.java
@@ -27,12 +27,9 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
-import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -53,7 +50,6 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import com.google.protobuf.ServiceException;
-import org.htrace.Trace;
/**
* Caller that goes to replica if the primary region does no answer within a configurable
@@ -99,7 +95,7 @@ public class RpcRetryingCallerWithReadReplicas {
* - we need to stop retrying when the call is completed
* - we can be interrupted
*/
- class ReplicaRegionServerCallable extends RegionServerCallable<Result> {
+ class ReplicaRegionServerCallable extends RegionServerCallable<Result> implements Cancellable {
final int id;
private final PayloadCarryingRpcController controller;
@@ -112,7 +108,8 @@ public class RpcRetryingCallerWithReadReplicas {
controller.setPriority(tableName);
}
- public void startCancel() {
+ @Override
+ public void cancel() {
controller.startCancel();
}
@@ -169,6 +166,11 @@ public class RpcRetryingCallerWithReadReplicas {
throw ProtobufUtil.getRemoteException(se);
}
}
+
+ @Override
+ public boolean isCancelled() {
+ return controller.isCanceled();
+ }
}
/**
@@ -194,7 +196,8 @@ public class RpcRetryingCallerWithReadReplicas {
RegionLocations rl = getRegionLocations(true, (isTargetReplicaSpecified ? get.getReplicaId()
: RegionReplicaUtil.DEFAULT_REPLICA_ID), cConnection, tableName, get.getRow());
- ResultBoundedCompletionService cs = new ResultBoundedCompletionService(pool, rl.size());
+ ResultBoundedCompletionService<Result> cs =
+ new ResultBoundedCompletionService<Result>(this.rpcRetryingCallerFactory, pool, rl.size());
if(isTargetReplicaSpecified) {
addCallsForReplica(cs, rl, get.getReplicaId(), get.getReplicaId());
@@ -273,12 +276,12 @@ public class RpcRetryingCallerWithReadReplicas {
* @param min - the id of the first replica, inclusive
* @param max - the id of the last replica, inclusive.
*/
- private void addCallsForReplica(ResultBoundedCompletionService cs,
+ private void addCallsForReplica(ResultBoundedCompletionService<Result> cs,
RegionLocations rl, int min, int max) {
for (int id = min; id <= max; id++) {
HRegionLocation hrl = rl.getRegionLocation(id);
ReplicaRegionServerCallable callOnReplica = new ReplicaRegionServerCallable(id, hrl);
- cs.submit(callOnReplica, callTimeout);
+ cs.submit(callOnReplica, callTimeout, id);
}
}
@@ -308,137 +311,4 @@ public class RpcRetryingCallerWithReadReplicas {
return rl;
}
-
-
- /**
- * A completion service for the RpcRetryingCallerFactory.
- * Keeps the list of the futures, and allows to cancel them all.
- * This means as well that it can be used for a small set of tasks only.
- * <br>Implementation is not Thread safe.
- */
- public class ResultBoundedCompletionService {
- private final Executor executor;
- private final QueueingFuture[] tasks; // all the tasks
- private volatile QueueingFuture completed = null;
-
- class QueueingFuture implements RunnableFuture<Result> {
- private final ReplicaRegionServerCallable future;
- private Result result = null;
- private ExecutionException exeEx = null;
- private volatile boolean canceled;
- private final int callTimeout;
- private final RpcRetryingCaller<Result> retryingCaller;
-
-
- public QueueingFuture(ReplicaRegionServerCallable future, int callTimeout) {
- this.future = future;
- this.callTimeout = callTimeout;
- this.retryingCaller = rpcRetryingCallerFactory.<Result>newCaller();
- }
-
- @Override
- public void run() {
- try {
- if (!canceled) {
- result =
- rpcRetryingCallerFactory.<Result>newCaller().callWithRetries(future, callTimeout);
- }
- } catch (Throwable t) {
- exeEx = new ExecutionException(t);
- } finally {
- if (!canceled && completed == null) {
- completed = QueueingFuture.this;
- synchronized (tasks) {
- tasks.notify();
- }
- }
- }
- }
-
- @Override
- public boolean cancel(boolean mayInterruptIfRunning) {
- if (result != null || exeEx != null) return false;
- retryingCaller.cancel();
- future.startCancel();
- canceled = true;
- return true;
- }
-
- @Override
- public boolean isCancelled() {
- return canceled;
- }
-
- @Override
- public boolean isDone() {
- return result != null || exeEx != null;
- }
-
- @Override
- public Result get() throws InterruptedException, ExecutionException {
- try {
- return get(1000, TimeUnit.DAYS);
- } catch (TimeoutException e) {
- throw new RuntimeException("You did wait for 1000 days here?", e);
- }
- }
-
- @edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RCN_REDUNDANT_NULLCHECK_OF_NULL_VALUE",
- justification="Is this an issue?")
- @Override
- public Result get(long timeout, TimeUnit unit)
- throws InterruptedException, ExecutionException, TimeoutException {
- synchronized (tasks) {
- if (result != null) {
- return result;
- }
- if (exeEx != null) {
- throw exeEx;
- }
- unit.timedWait(tasks, timeout);
- }
- // Findbugs says this null check is redundant. Will result be set across the wait above?
- if (result != null) {
- return result;
- }
- if (exeEx != null) {
- throw exeEx;
- }
-
- throw new TimeoutException("timeout=" + timeout + ", " + unit);
- }
- }
-
- public ResultBoundedCompletionService(Executor executor, int maxTasks) {
- this.executor = executor;
- this.tasks = new QueueingFuture[maxTasks];
- }
-
-
- public void submit(ReplicaRegionServerCallable task, int callTimeout) {
- QueueingFuture newFuture = new QueueingFuture(task, callTimeout);
- executor.execute(Trace.wrap(newFuture));
- tasks[task.id] = newFuture;
- }
-
- public QueueingFuture take() throws InterruptedException {
- synchronized (tasks) {
- while (completed == null) tasks.wait();
- }
- return completed;
- }
-
- public QueueingFuture poll(long timeout, TimeUnit unit) throws InterruptedException {
- synchronized (tasks) {
- if (completed == null) unit.timedWait(tasks, timeout);
- }
- return completed;
- }
-
- public void cancelAll() {
- for (QueueingFuture future : tasks) {
- if (future != null) future.cancel(true);
- }
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c45d1cd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
index 0aecef2..22f98a3 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallable.java
@@ -88,6 +88,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
protected boolean isRegionServerRemote = true;
private long nextCallSeq = 0;
protected RpcControllerFactory controllerFactory;
+ protected PayloadCarryingRpcController controller;
/**
* @param connection which connection
@@ -123,6 +124,10 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
this.controllerFactory = rpcControllerFactory;
}
+ PayloadCarryingRpcController getController() {
+ return controller;
+ }
+
/**
* @param reload force reload of server location
* @throws IOException
@@ -191,7 +196,7 @@ public class ScannerCallable extends RegionServerCallable<Result[]> {
incRPCcallsMetrics();
request = RequestConverter.buildScanRequest(scannerId, caching, false, nextCallSeq);
ScanResponse response = null;
- PayloadCarryingRpcController controller = controllerFactory.newController();
+ controller = controllerFactory.newController();
controller.setPriority(getTableName());
controller.setCallTimeout(callTimeout);
try {
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c45d1cd/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
index 0de658b..4209987 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ScannerCallableWithReplicas.java
@@ -39,8 +39,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.BoundedCompletionService;
import org.apache.hadoop.hbase.util.Pair;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* This class has the logic for handling scanners for regions with and without replicas.
* 1. A scan is attempted on the default (primary) region
@@ -69,8 +70,9 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
private Configuration conf;
private int scannerTimeout;
private Set<ScannerCallable> outstandingCallables = new HashSet<ScannerCallable>();
+ private boolean someRPCcancelled = false; //required for testing purposes only
- public ScannerCallableWithReplicas (TableName tableName, ClusterConnection cConnection,
+ public ScannerCallableWithReplicas(TableName tableName, ClusterConnection cConnection,
ScannerCallable baseCallable, ExecutorService pool, int timeBeforeReplicas, Scan scan,
int retries, int scannerTimeout, int caching, Configuration conf,
RpcRetryingCaller<Result []> caller) {
@@ -134,8 +136,10 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// allocate a boundedcompletion pool of some multiple of number of replicas.
// We want to accomodate some RPCs for redundant replica scans (but are still in progress)
- BoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
- new BoundedCompletionService<Pair<Result[], ScannerCallable>>(pool, rl.size() * 5);
+ ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs =
+ new ResultBoundedCompletionService<Pair<Result[], ScannerCallable>>(
+ new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf), pool,
+ rl.size() * 5);
List<ExecutionException> exceptions = null;
int submitted = 0, completed = 0;
@@ -192,7 +196,7 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
} finally {
// We get there because we were interrupted or because one or more of the
// calls succeeded or failed. In all case, we stop all our tasks.
- cs.cancelAll(true);
+ cs.cancelAll();
}
if (exceptions != null && !exceptions.isEmpty()) {
@@ -226,8 +230,14 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
// want to wait for the "close" to happen yet. The "wait" will happen when
// the table is closed (when the awaitTermination of the underlying pool is called)
s.setClose();
- RetryingRPC r = new RetryingRPC(s);
- pool.submit(r);
+ final RetryingRPC r = new RetryingRPC(s);
+ pool.submit(new Callable<Void>(){
+ @Override
+ public Void call() throws Exception {
+ r.call(scannerTimeout);
+ return null;
+ }
+ });
}
// now clear outstandingCallables since we scheduled a close for all the contained scanners
outstandingCallables.clear();
@@ -244,16 +254,16 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
}
private int addCallsForCurrentReplica(
- BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
+ ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl) {
RetryingRPC retryingOnReplica = new RetryingRPC(currentScannerCallable);
outstandingCallables.add(currentScannerCallable);
- cs.submit(retryingOnReplica);
+ cs.submit(retryingOnReplica, scannerTimeout, currentScannerCallable.id);
return 1;
}
private int addCallsForOtherReplicas(
- BoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl, int min,
- int max) {
+ ResultBoundedCompletionService<Pair<Result[], ScannerCallable>> cs, RegionLocations rl,
+ int min, int max) {
if (scan.getConsistency() == Consistency.STRONG) {
return 0; // not scheduling on other replicas for strong consistency
}
@@ -267,32 +277,85 @@ class ScannerCallableWithReplicas implements RetryingCallable<Result[]> {
}
outstandingCallables.add(s);
RetryingRPC retryingOnReplica = new RetryingRPC(s);
- cs.submit(retryingOnReplica);
+ cs.submit(retryingOnReplica, scannerTimeout, id);
}
return max - min + 1;
}
- class RetryingRPC implements Callable<Pair<Result[], ScannerCallable>> {
+ @VisibleForTesting
+ boolean isAnyRPCcancelled() {
+ return someRPCcancelled;
+ }
+
+ class RetryingRPC implements RetryingCallable<Pair<Result[], ScannerCallable>>, Cancellable {
final ScannerCallable callable;
+ RpcRetryingCaller<Result[]> caller;
+ private volatile boolean cancelled = false;
RetryingRPC(ScannerCallable callable) {
this.callable = callable;
- }
-
- @Override
- public Pair<Result[], ScannerCallable> call() throws IOException {
// For the Consistency.STRONG (default case), we reuse the caller
// to keep compatibility with what is done in the past
// For the Consistency.TIMELINE case, we can't reuse the caller
// since we could be making parallel RPCs (caller.callWithRetries is synchronized
// and we can't invoke it multiple times at the same time)
- RpcRetryingCaller<Result[]> caller = ScannerCallableWithReplicas.this.caller;
+ this.caller = ScannerCallableWithReplicas.this.caller;
if (scan.getConsistency() == Consistency.TIMELINE) {
- caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf).
+ this.caller = new RpcRetryingCallerFactory(ScannerCallableWithReplicas.this.conf).
<Result[]>newCaller();
}
- Result[] res = caller.callWithRetries(callable, scannerTimeout);
- return new Pair<Result[], ScannerCallable>(res, callable);
+ }
+
+ @Override
+ public Pair<Result[], ScannerCallable> call(int callTimeout) throws IOException {
+ // since the retries is done within the ResultBoundedCompletionService,
+ // we don't invoke callWithRetries here
+ if (cancelled) {
+ return null;
+ }
+ Result[] res = this.caller.callWithoutRetries(this.callable, callTimeout);
+ return new Pair<Result[], ScannerCallable>(res, this.callable);
+ }
+
+ @Override
+ public void prepare(boolean reload) throws IOException {
+ if (cancelled) return;
+
+ if (Thread.interrupted()) {
+ throw new InterruptedIOException();
+ }
+
+ callable.prepare(reload);
+ }
+
+ @Override
+ public void throwable(Throwable t, boolean retrying) {
+ callable.throwable(t, retrying);
+ }
+
+ @Override
+ public String getExceptionMessageAdditionalDetail() {
+ return callable.getExceptionMessageAdditionalDetail();
+ }
+
+ @Override
+ public long sleep(long pause, int tries) {
+ return callable.sleep(pause, tries);
+ }
+
+ @Override
+ public void cancel() {
+ cancelled = true;
+ caller.cancel();
+ if (callable.getController() != null) {
+ callable.getController().startCancel();
+ }
+ someRPCcancelled = true;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return cancelled;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1c45d1cd/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
index 3d1b1c8..bb2d4db 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestReplicasClient.java
@@ -539,6 +539,54 @@ public class TestReplicasClient {
runMultipleScansOfOneType(true, false);
}
+ @Test
+ public void testCancelOfScan() throws Exception {
+ openRegion(hriSecondary);
+ int NUMROWS = 100;
+ try {
+ for (int i = 0; i < NUMROWS; i++) {
+ byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
+ Put p = new Put(b1);
+ p.add(f, b1, b1);
+ table.put(p);
+ }
+ LOG.debug("PUT done");
+ int caching = 20;
+ byte[] start;
+ start = Bytes.toBytes("testUseRegionWithReplica" + 0);
+
+ flushRegion(hriPrimary);
+ LOG.info("flush done");
+ Thread.sleep(1000 + REFRESH_PERIOD * 2);
+
+ // now make some 'next' calls slow
+ SlowMeCopro.slowDownNext.set(true);
+ SlowMeCopro.countOfNext.set(0);
+ SlowMeCopro.sleepTime.set(5000);
+
+ Scan scan = new Scan(start);
+ scan.setCaching(caching);
+ scan.setConsistency(Consistency.TIMELINE);
+ ResultScanner scanner = table.getScanner(scan);
+ Iterator<Result> iter = scanner.iterator();
+ iter.next();
+ Assert.assertTrue(((ClientScanner)scanner).isAnyRPCcancelled());
+ SlowMeCopro.slowDownNext.set(false);
+ SlowMeCopro.countOfNext.set(0);
+ } finally {
+ SlowMeCopro.getCdl().get().countDown();
+ SlowMeCopro.sleepTime.set(0);
+ SlowMeCopro.slowDownNext.set(false);
+ SlowMeCopro.countOfNext.set(0);
+ for (int i = 0; i < NUMROWS; i++) {
+ byte[] b1 = Bytes.toBytes("testUseRegionWithReplica" + i);
+ Delete d = new Delete(b1);
+ table.delete(d);
+ }
+ closeRegion(hriSecondary);
+ }
+ }
+
private void runMultipleScansOfOneType(boolean reversed, boolean small) throws Exception {
openRegion(hriSecondary);
int NUMROWS = 100;