You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/03/16 23:50:05 UTC
hbase git commit: HBASE-17778 Remove the testing code in the
AsyncRequestFutureImpl
Repository: hbase
Updated Branches:
refs/heads/master 7c19490ba -> e2a070cae
HBASE-17778 Remove the testing code in the AsyncRequestFutureImpl
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e2a070ca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e2a070ca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e2a070ca
Branch: refs/heads/master
Commit: e2a070cae0ab785b771a923146116c0e9f3452a5
Parents: 7c19490
Author: CHIA-PING TSAI <ch...@gmail.com>
Authored: Mon Mar 13 14:33:36 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Fri Mar 17 07:49:13 2017 +0800
----------------------------------------------------------------------
.../hbase/client/AsyncRequestFutureImpl.java | 49 ++++----------------
.../hadoop/hbase/client/TestAsyncProcess.java | 33 +++++++++++--
2 files changed, 40 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/e2a070ca/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index 41431bb..e6e4fd1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -28,7 +28,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -181,13 +180,14 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
* Runnable (that can be submitted to thread pool) that submits MultiAction to a
* single server. The server call is synchronous, therefore we do it on a thread pool.
*/
- private final class SingleServerRequestRunnable implements Runnable {
+ @VisibleForTesting
+ final class SingleServerRequestRunnable implements Runnable {
private final MultiAction multiAction;
private final int numAttempt;
private final ServerName server;
private final Set<CancellableRegionServerCallable> callsInProgress;
- private Long heapSize = null;
- private SingleServerRequestRunnable(
+ @VisibleForTesting
+ SingleServerRequestRunnable(
MultiAction multiAction, int numAttempt, ServerName server,
Set<CancellableRegionServerCallable> callsInProgress) {
this.multiAction = multiAction;
@@ -196,24 +196,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
this.callsInProgress = callsInProgress;
}
- @VisibleForTesting
- long heapSize() {
- if (heapSize != null) {
- return heapSize;
- }
- heapSize = 0L;
- for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) {
- List<Action> actions = e.getValue();
- for (Action action: actions) {
- Row row = action.getAction();
- if (row instanceof Mutation) {
- heapSize += ((Mutation) row).heapSize();
- }
- }
- }
- return heapSize;
- }
-
@Override
public void run() {
AbstractResponse res = null;
@@ -303,7 +285,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
private final CancellableRegionServerCallable currentCallable;
private final int operationTimeout;
private final int rpcTimeout;
- private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
private final AsyncProcess asyncProcess;
/**
@@ -423,20 +404,11 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
}
@VisibleForTesting
- Map<ServerName, List<Long>> getRequestHeapSize() {
- return heapSizesByServer;
+ SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, ServerName server,
+ Set<CancellableRegionServerCallable> callsInProgress) {
+ return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress);
}
- private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server,
- SingleServerRequestRunnable runnable) {
- List<Long> heapCount = heapSizesByServer.get(server);
- if (heapCount == null) {
- heapCount = new LinkedList<>();
- heapSizesByServer.put(server, heapCount);
- }
- heapCount.add(runnable.heapSize());
- return runnable;
- }
/**
* Group a list of actions per region servers, and send them.
*
@@ -608,8 +580,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
}
asyncProcess.incTaskCounters(multiAction.getRegions(), server);
- SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server,
- new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress));
+ SingleServerRequestRunnable runnable = createSingleServerRequest(
+ multiAction, numAttempt, server, callsInProgress);
return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable));
}
@@ -631,8 +603,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
for (DelayingRunner runner : actions.values()) {
asyncProcess.incTaskCounters(runner.getActions().getRegions(), server);
String traceText = "AsyncProcess.sendMultiAction";
- Runnable runnable = addSingleServerRequestHeapSize(server,
- new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress));
+ Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress);
// use a delay runner only if we need to sleep for some time
if (runner.getSleepTime() > 0) {
runner.setRunner(runnable);
http://git-wip-us.apache.org/repos/asf/hbase/blob/e2a070ca/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index f2f0467..3139af1 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -261,7 +261,7 @@ public class TestAsyncProcess {
static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
-
+ private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
long nonceGroup, AsyncProcess asyncProcess) {
super(task, actions, nonceGroup, asyncProcess);
@@ -272,6 +272,33 @@ public class TestAsyncProcess {
// Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
}
+ Map<ServerName, List<Long>> getRequestHeapSize() {
+ return heapSizesByServer;
+ }
+
+ @Override
+ SingleServerRequestRunnable createSingleServerRequest(
+ MultiAction multiAction, int numAttempt, ServerName server,
+ Set<CancellableRegionServerCallable> callsInProgress) {
+ SingleServerRequestRunnable rq = new SingleServerRequestRunnable(
+ multiAction, numAttempt, server, callsInProgress);
+ List<Long> heapCount = heapSizesByServer.get(server);
+ if (heapCount == null) {
+ heapCount = new ArrayList<>();
+ heapSizesByServer.put(server, heapCount);
+ }
+ heapCount.add(heapSizeOf(multiAction));
+ return rq;
+ }
+
+ private long heapSizeOf(MultiAction multiAction) {
+ return multiAction.actions.values().stream()
+ .flatMap(v -> v.stream())
+ .map(action -> action.getAction())
+ .filter(row -> row instanceof Mutation)
+ .mapToLong(row -> ((Mutation) row).heapSize())
+ .sum();
+ }
}
static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse>{
@@ -635,7 +662,7 @@ public class TestAsyncProcess {
if (!(req instanceof AsyncRequestFutureImpl)) {
continue;
}
- AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req;
+ MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
if (ars.getRequestHeapSize().containsKey(sn)) {
++actualSnReqCount;
}
@@ -651,7 +678,7 @@ public class TestAsyncProcess {
if (!(req instanceof AsyncRequestFutureImpl)) {
continue;
}
- AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req;
+ MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize();
for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) {
long sum = 0;