You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ch...@apache.org on 2017/03/16 23:50:05 UTC

hbase git commit: HBASE-17778 Remove the testing code in the AsyncRequestFutureImpl

Repository: hbase
Updated Branches:
  refs/heads/master 7c19490ba -> e2a070cae


HBASE-17778 Remove the testing code in the AsyncRequestFutureImpl


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e2a070ca
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e2a070ca
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e2a070ca

Branch: refs/heads/master
Commit: e2a070cae0ab785b771a923146116c0e9f3452a5
Parents: 7c19490
Author: CHIA-PING TSAI <ch...@gmail.com>
Authored: Mon Mar 13 14:33:36 2017 +0800
Committer: Chia-Ping Tsai <ch...@gmail.com>
Committed: Fri Mar 17 07:49:13 2017 +0800

----------------------------------------------------------------------
 .../hbase/client/AsyncRequestFutureImpl.java    | 49 ++++----------------
 .../hadoop/hbase/client/TestAsyncProcess.java   | 33 +++++++++++--
 2 files changed, 40 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e2a070ca/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
index 41431bb..e6e4fd1 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRequestFutureImpl.java
@@ -28,7 +28,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -181,13 +180,14 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
    * Runnable (that can be submitted to thread pool) that submits MultiAction to a
    * single server. The server call is synchronous, therefore we do it on a thread pool.
    */
-  private final class SingleServerRequestRunnable implements Runnable {
+  @VisibleForTesting
+  final class SingleServerRequestRunnable implements Runnable {
     private final MultiAction multiAction;
     private final int numAttempt;
     private final ServerName server;
     private final Set<CancellableRegionServerCallable> callsInProgress;
-    private Long heapSize = null;
-    private SingleServerRequestRunnable(
+    @VisibleForTesting
+    SingleServerRequestRunnable(
         MultiAction multiAction, int numAttempt, ServerName server,
         Set<CancellableRegionServerCallable> callsInProgress) {
       this.multiAction = multiAction;
@@ -196,24 +196,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
       this.callsInProgress = callsInProgress;
     }
 
-    @VisibleForTesting
-    long heapSize() {
-      if (heapSize != null) {
-        return heapSize;
-      }
-      heapSize = 0L;
-      for (Map.Entry<byte[], List<Action>> e: this.multiAction.actions.entrySet()) {
-        List<Action> actions = e.getValue();
-        for (Action action: actions) {
-          Row row = action.getAction();
-          if (row instanceof Mutation) {
-            heapSize += ((Mutation) row).heapSize();
-          }
-        }
-      }
-      return heapSize;
-    }
-
     @Override
     public void run() {
       AbstractResponse res = null;
@@ -303,7 +285,6 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
   private final CancellableRegionServerCallable currentCallable;
   private final int operationTimeout;
   private final int rpcTimeout;
-  private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
   private final AsyncProcess asyncProcess;
 
   /**
@@ -423,20 +404,11 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
   }
 
   @VisibleForTesting
-  Map<ServerName, List<Long>> getRequestHeapSize() {
-    return heapSizesByServer;
+  SingleServerRequestRunnable createSingleServerRequest(MultiAction multiAction, int numAttempt, ServerName server,
+        Set<CancellableRegionServerCallable> callsInProgress) {
+    return new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress);
   }
 
-  private SingleServerRequestRunnable addSingleServerRequestHeapSize(ServerName server,
-    SingleServerRequestRunnable runnable) {
-    List<Long> heapCount = heapSizesByServer.get(server);
-    if (heapCount == null) {
-      heapCount = new LinkedList<>();
-      heapSizesByServer.put(server, heapCount);
-    }
-    heapCount.add(runnable.heapSize());
-    return runnable;
-  }
   /**
    * Group a list of actions per region servers, and send them.
    *
@@ -608,8 +580,8 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
         asyncProcess.connection.getConnectionMetrics().incrNormalRunners();
       }
       asyncProcess.incTaskCounters(multiAction.getRegions(), server);
-      SingleServerRequestRunnable runnable = addSingleServerRequestHeapSize(server,
-          new SingleServerRequestRunnable(multiAction, numAttempt, server, callsInProgress));
+      SingleServerRequestRunnable runnable = createSingleServerRequest(
+              multiAction, numAttempt, server, callsInProgress);
       return Collections.singletonList(Trace.wrap("AsyncProcess.sendMultiAction", runnable));
     }
 
@@ -631,8 +603,7 @@ class AsyncRequestFutureImpl<CResult> implements AsyncRequestFuture {
     for (DelayingRunner runner : actions.values()) {
       asyncProcess.incTaskCounters(runner.getActions().getRegions(), server);
       String traceText = "AsyncProcess.sendMultiAction";
-      Runnable runnable = addSingleServerRequestHeapSize(server,
-          new SingleServerRequestRunnable(runner.getActions(), numAttempt, server, callsInProgress));
+      Runnable runnable = createSingleServerRequest(runner.getActions(), numAttempt, server, callsInProgress);
       // use a delay runner only if we need to sleep for some time
       if (runner.getSleepTime() > 0) {
         runner.setRunner(runnable);

http://git-wip-us.apache.org/repos/asf/hbase/blob/e2a070ca/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
index f2f0467..3139af1 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestAsyncProcess.java
@@ -261,7 +261,7 @@ public class TestAsyncProcess {
 
 
   static class MyAsyncRequestFutureImpl<Res> extends AsyncRequestFutureImpl<Res> {
-
+    private final Map<ServerName, List<Long>> heapSizesByServer = new HashMap<>();
     public MyAsyncRequestFutureImpl(AsyncProcessTask task, List<Action> actions,
       long nonceGroup, AsyncProcess asyncProcess) {
       super(task, actions, nonceGroup, asyncProcess);
@@ -272,6 +272,33 @@ public class TestAsyncProcess {
       // Do nothing for avoiding the NPE if we test the ClientBackofPolicy.
     }
 
+    Map<ServerName, List<Long>> getRequestHeapSize() {
+      return heapSizesByServer;
+    }
+
+    @Override
+    SingleServerRequestRunnable createSingleServerRequest(
+          MultiAction multiAction, int numAttempt, ServerName server,
+        Set<CancellableRegionServerCallable> callsInProgress) {
+      SingleServerRequestRunnable rq = new SingleServerRequestRunnable(
+              multiAction, numAttempt, server, callsInProgress);
+      List<Long> heapCount = heapSizesByServer.get(server);
+      if (heapCount == null) {
+        heapCount = new ArrayList<>();
+        heapSizesByServer.put(server, heapCount);
+      }
+      heapCount.add(heapSizeOf(multiAction));
+      return rq;
+    }
+
+    private long heapSizeOf(MultiAction multiAction) {
+      return multiAction.actions.values().stream()
+              .flatMap(v -> v.stream())
+              .map(action -> action.getAction())
+              .filter(row -> row instanceof Mutation)
+              .mapToLong(row -> ((Mutation) row).heapSize())
+              .sum();
+    }
   }
 
   static class CallerWithFailure extends RpcRetryingCallerImpl<AbstractResponse>{
@@ -635,7 +662,7 @@ public class TestAsyncProcess {
         if (!(req instanceof AsyncRequestFutureImpl)) {
           continue;
         }
-        AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req;
+        MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
         if (ars.getRequestHeapSize().containsKey(sn)) {
           ++actualSnReqCount;
         }
@@ -651,7 +678,7 @@ public class TestAsyncProcess {
         if (!(req instanceof AsyncRequestFutureImpl)) {
           continue;
         }
-        AsyncRequestFutureImpl ars = (AsyncRequestFutureImpl) req;
+        MyAsyncRequestFutureImpl ars = (MyAsyncRequestFutureImpl) req;
         Map<ServerName, List<Long>> requestHeapSize = ars.getRequestHeapSize();
         for (Map.Entry<ServerName, List<Long>> entry : requestHeapSize.entrySet()) {
           long sum = 0;