You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2015/10/08 02:05:41 UTC

hbase git commit: HBASE-12911 Client-side metrics

Repository: hbase
Updated Branches:
  refs/heads/master e1fd3526b -> 7e30436e3


HBASE-12911 Client-side metrics


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7e30436e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7e30436e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7e30436e

Branch: refs/heads/master
Commit: 7e30436e3fa84525b85b05b9e23cb01b2ada7c12
Parents: e1fd352
Author: Nick Dimiduk <nd...@apache.org>
Authored: Mon Oct 5 10:19:40 2015 -0700
Committer: Nick Dimiduk <nd...@apache.org>
Committed: Wed Oct 7 17:01:56 2015 -0700

----------------------------------------------------------------------
 hbase-client/pom.xml                            |   4 +
 .../hadoop/hbase/client/ClusterConnection.java  |   5 +
 .../apache/hadoop/hbase/client/Connection.java  |   1 -
 .../hbase/client/ConnectionImplementation.java  |  27 +-
 .../apache/hadoop/hbase/client/MetaCache.java   |   9 +
 .../hadoop/hbase/client/MetricsConnection.java  | 324 +++++++++++++++++++
 .../hadoop/hbase/ipc/AbstractRpcClient.java     |  25 +-
 .../org/apache/hadoop/hbase/ipc/AsyncCall.java  |   6 +-
 .../hadoop/hbase/ipc/AsyncRpcChannel.java       |   9 +-
 .../apache/hadoop/hbase/ipc/AsyncRpcClient.java |  96 ++++--
 .../hbase/ipc/AsyncServerResponseHandler.java   |   5 +-
 .../java/org/apache/hadoop/hbase/ipc/Call.java  |  11 +-
 .../hadoop/hbase/ipc/RpcClientFactory.java      |  22 +-
 .../apache/hadoop/hbase/ipc/RpcClientImpl.java  |  50 ++-
 .../hbase/client/TestMetricsConnection.java     | 120 +++++++
 .../hbase/regionserver/HRegionServer.java       |   2 +-
 .../hbase/regionserver/MetricsRegionServer.java |   3 +-
 .../hadoop/hbase/client/TestClientTimeouts.java |   7 +-
 .../hadoop/hbase/ipc/AbstractTestIPC.java       |  18 +-
 .../apache/hadoop/hbase/ipc/TestAsyncIPC.java   |  14 +-
 .../hbase/ipc/TestGlobalEventLoopGroup.java     |   6 +-
 .../org/apache/hadoop/hbase/ipc/TestIPC.java    |   4 +-
 .../hadoop/hbase/ipc/TestRpcClientLeaks.java    |   6 +-
 .../hbase/ipc/TestRpcHandlerException.java      |   3 +-
 24 files changed, 667 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index 425bd05..401e28e 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -189,6 +189,10 @@
       <artifactId>log4j</artifactId>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>com.yammer.metrics</groupId>
+      <artifactId>metrics-core</artifactId>
+    </dependency>
   </dependencies>
 
   <profiles>

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
index b3d99ae..99071fa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
@@ -297,4 +297,9 @@ public interface ClusterConnection extends HConnection {
    */
   ClientBackoffPolicy getBackoffPolicy();
 
+  /**
+   * @return the MetricsConnection instance associated with this connection.
+   */
+  public MetricsConnection getConnectionMetrics();
+
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
index dab4905..a3f6fe6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
@@ -174,5 +174,4 @@ public interface Connection extends Abortable, Closeable {
    * @return true if this connection is closed
    */
   boolean isClosed();
-
 }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index fbb77dc..9f03184 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -18,6 +18,8 @@
  */
 package org.apache.hadoop.hbase.client;
 
+import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.BlockingRpcChannel;
 import com.google.protobuf.RpcController;
@@ -165,11 +167,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
   // Client rpc instance.
   private RpcClient rpcClient;
 
-  private MetaCache metaCache = new MetaCache();
+  private final MetaCache metaCache;
+  private final MetricsConnection metrics;
 
   private int refCount;
 
-  private User user;
+  protected User user;
 
   private RpcRetryingCallerFactory rpcCallerFactory;
 
@@ -198,11 +201,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
         HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
     this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
-      HConstants.DEFAULT_USE_META_REPLICAS);
+        HConstants.DEFAULT_USE_META_REPLICAS);
     this.numTries = tableConfig.getRetriesNumber();
     this.rpcTimeout = conf.getInt(
-      HConstants.HBASE_RPC_TIMEOUT_KEY,
-      HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+        HConstants.HBASE_RPC_TIMEOUT_KEY,
+        HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
     if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
       synchronized (nonceGeneratorCreateLock) {
         if (nonceGenerator == null) {
@@ -219,6 +222,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
     this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
     this.asyncProcess = createAsyncProcess(this.conf);
+    if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
+      this.metrics = new MetricsConnection(this);
+    } else {
+      this.metrics = null;
+    }
+    this.metaCache = new MetaCache(this.metrics);
 
     boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
         HConstants.STATUS_PUBLISHED_DEFAULT);
@@ -377,6 +386,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     return new HBaseAdmin(this);
   }
 
+  @Override
+  public MetricsConnection getConnectionMetrics() {
+    return this.metrics;
+  }
+
   private ExecutorService getBatchPool() {
     if (batchPool == null) {
       synchronized (this) {
@@ -2140,6 +2154,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
     }
     closeMaster();
     shutdownPools();
+    if (this.metrics != null) {
+      this.metrics.shutdown();
+    }
     this.closed = true;
     closeZooKeeperWatcher();
     this.stubs.clear();

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
index 8e1c93c..b23ca70 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
@@ -59,6 +59,12 @@ public class MetaCache {
   // The access to this attribute must be protected by a lock on cachedRegionLocations
   private final Set<ServerName> cachedServers = new ConcurrentSkipListSet<ServerName>();
 
+  private final MetricsConnection metrics;
+
+  public MetaCache(MetricsConnection metrics) {
+    this.metrics = metrics;
+  }
+
   /**
    * Search the cache for a location that fits our table and row key.
    * Return null if no suitable region is located.
@@ -74,6 +80,7 @@ public class MetaCache {
 
     Entry<byte[], RegionLocations> e = tableLocations.floorEntry(row);
     if (e == null) {
+      if (metrics != null) metrics.incrMetaCacheMiss();
       return null;
     }
     RegionLocations possibleRegion = e.getValue();
@@ -94,10 +101,12 @@ public class MetaCache {
     // HConstants.EMPTY_END_ROW) check itself will pass.
     if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
         Bytes.compareTo(endKey, 0, endKey.length, row, 0, row.length) > 0) {
+      if (metrics != null) metrics.incrMetaCacheHit();
       return possibleRegion;
     }
 
     // Passed all the way through, so we got nothing - complete cache miss
+    if (metrics != null) metrics.incrMetaCacheMiss();
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
new file mode 100644
index 0000000..f34fb8a
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.reporting.JmxReporter;
+import com.yammer.metrics.util.RatioGauge;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is for maintaining the various connection statistics and publishing them through
+ * the metrics interfaces.
+ *
+ * This class manages its own {@link MetricsRegistry} and {@link JmxReporter} so as to not
+ * conflict with other uses of Yammer Metrics within the client application. Instantiating
+ * this class implicitly creates and "starts" instances of these classes; be sure to call
+ * {@link #shutdown()} to terminate the thread pools they allocate.
+ */
+@InterfaceAudience.Private
+public class MetricsConnection {
+
+  /** Set this key to {@code true} to enable metrics collection of client requests. */
+  public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
+
+  private static final String DRTN_BASE = "rpcCallDurationMs_";
+  private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
+  private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
+  private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
+
+  /** A container class for collecting details about the RPC call as it percolates. */
+  public static class CallStats {
+    private long requestSizeBytes = 0;
+    private long responseSizeBytes = 0;
+    private long startTime = 0;
+    private long callTimeMs = 0;
+
+    public long getRequestSizeBytes() {
+      return requestSizeBytes;
+    }
+
+    public void setRequestSizeBytes(long requestSizeBytes) {
+      this.requestSizeBytes = requestSizeBytes;
+    }
+
+    public long getResponseSizeBytes() {
+      return responseSizeBytes;
+    }
+
+    public void setResponseSizeBytes(long responseSizeBytes) {
+      this.responseSizeBytes = responseSizeBytes;
+    }
+
+    public long getStartTime() {
+      return startTime;
+    }
+
+    public void setStartTime(long startTime) {
+      this.startTime = startTime;
+    }
+
+    public long getCallTimeMs() {
+      return callTimeMs;
+    }
+
+    public void setCallTimeMs(long callTimeMs) {
+      this.callTimeMs = callTimeMs;
+    }
+  }
+
+  @VisibleForTesting
+  protected final class CallTracker {
+    private final String name;
+    @VisibleForTesting final Timer callTimer;
+    @VisibleForTesting final Histogram reqHist;
+    @VisibleForTesting final Histogram respHist;
+
+    private CallTracker(MetricsRegistry registry, String name, String subName, String scope) {
+      StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name);
+      if (subName != null) {
+        sb.append("(").append(subName).append(")");
+      }
+      this.name = sb.toString();
+      this.callTimer = registry.newTimer(MetricsConnection.class, DRTN_BASE + this.name, scope);
+      this.reqHist = registry.newHistogram(MetricsConnection.class, REQ_BASE + this.name, scope);
+      this.respHist = registry.newHistogram(MetricsConnection.class, RESP_BASE + this.name, scope);
+    }
+
+    private CallTracker(MetricsRegistry registry, String name, String scope) {
+      this(registry, name, null, scope);
+    }
+
+    public void updateRpc(CallStats stats) {
+      this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
+      this.reqHist.update(stats.getRequestSizeBytes());
+      this.respHist.update(stats.getResponseSizeBytes());
+    }
+
+    @Override
+    public String toString() {
+      return "CallTracker:" + name;
+    }
+  }
+
+  /** A lambda for dispatching to the appropriate metric factory method */
+  private static interface NewMetric<T> {
+    T newMetric(Class<?> clazz, String name, String scope);
+  }
+
+  /** Anticipated number of metric entries */
+  private static final int CAPACITY = 50;
+  /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */
+  private static final float LOAD_FACTOR = 0.75f;
+  /**
+   * Anticipated number of concurrent accessor threads, from
+   * {@link ConnectionImplementation#getBatchPool()}
+   */
+  private static final int CONCURRENCY_LEVEL = 256;
+
+  private final MetricsRegistry registry;
+  private final JmxReporter reporter;
+  private final String scope;
+
+  private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
+    @Override public Timer newMetric(Class<?> clazz, String name, String scope) {
+      return registry.newTimer(clazz, name, scope);
+    }
+  };
+
+  private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() {
+    @Override public Histogram newMetric(Class<?> clazz, String name, String scope) {
+      return registry.newHistogram(clazz, name, scope);
+    }
+  };
+
+  // static metrics
+
+  @VisibleForTesting protected final Counter metaCacheHits;
+  @VisibleForTesting protected final Counter metaCacheMisses;
+  @VisibleForTesting protected final CallTracker getTracker;
+  @VisibleForTesting protected final CallTracker scanTracker;
+  @VisibleForTesting protected final CallTracker appendTracker;
+  @VisibleForTesting protected final CallTracker deleteTracker;
+  @VisibleForTesting protected final CallTracker incrementTracker;
+  @VisibleForTesting protected final CallTracker putTracker;
+  @VisibleForTesting protected final CallTracker multiTracker;
+
+  // dynamic metrics
+
+  // These maps are used to cache references to the metric instances that are managed by the
+  // registry. I don't think their use perfectly removes redundant allocations, but it's
+  // a big improvement over calling registry.newMetric each time.
+  @VisibleForTesting protected final ConcurrentMap<String, Timer> rpcTimers =
+      new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
+  @VisibleForTesting protected final ConcurrentMap<String, Histogram> rpcHistograms =
+      new ConcurrentHashMap<>(CAPACITY * 2 /* tracking both request and response sizes */,
+          LOAD_FACTOR, CONCURRENCY_LEVEL);
+
+  public MetricsConnection(final ConnectionImplementation conn) {
+    this.scope = conn.toString();
+    this.registry = new MetricsRegistry();
+    final ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool();
+    final ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool();
+
+    this.registry.newGauge(this.getClass(), "executorPoolActiveThreads", scope,
+        new RatioGauge() {
+          @Override protected double getNumerator() {
+            return batchPool.getActiveCount();
+          }
+          @Override protected double getDenominator() {
+            return batchPool.getMaximumPoolSize();
+          }
+        });
+    this.registry.newGauge(this.getClass(), "metaPoolActiveThreads", scope,
+        new RatioGauge() {
+          @Override protected double getNumerator() {
+            return metaPool.getActiveCount();
+          }
+          @Override protected double getDenominator() {
+            return metaPool.getMaximumPoolSize();
+          }
+        });
+    this.metaCacheHits = registry.newCounter(this.getClass(), "metaCacheHits", scope);
+    this.metaCacheMisses = registry.newCounter(this.getClass(), "metaCacheMisses", scope);
+    this.getTracker = new CallTracker(this.registry, "Get", scope);
+    this.scanTracker = new CallTracker(this.registry, "Scan", scope);
+    this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
+    this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope);
+    this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
+    this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
+    this.multiTracker = new CallTracker(this.registry, "Multi", scope);
+    this.reporter = new JmxReporter(this.registry);
+    this.reporter.start();
+  }
+
+  public void shutdown() {
+    this.reporter.shutdown();
+    this.registry.shutdown();
+  }
+
+  /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
+  public static CallStats newCallStats() {
+    // TODO: instance pool to reduce GC?
+    return new CallStats();
+  }
+
+  /** Increment the number of meta cache hits. */
+  public void incrMetaCacheHit() {
+    metaCacheHits.inc();
+  }
+
+  /** Increment the number of meta cache misses. */
+  public void incrMetaCacheMiss() {
+    metaCacheMisses.inc();
+  }
+
+  /**
+   * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
+   */
+  private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) {
+    T t = map.get(key);
+    if (t == null) {
+      t = factory.newMetric(this.getClass(), key, scope);
+      map.putIfAbsent(key, t);
+    }
+    return t;
+  }
+
+  /** Update call stats for non-critical-path methods */
+  private void updateRpcGeneric(MethodDescriptor method, CallStats stats) {
+    final String methodName = method.getService().getName() + "_" + method.getName();
+    getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory)
+        .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
+    getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
+        .update(stats.getRequestSizeBytes());
+    getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory)
+        .update(stats.getResponseSizeBytes());
+  }
+
+  /** Report RPC context to metrics system. */
+  public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
+    // this implementation is tied directly to protobuf implementation details. would be better
+    // if we could dispatch based on something static, ie, request Message type.
+    if (method.getService() == ClientService.getDescriptor()) {
+      switch(method.getIndex()) {
+      case 0:
+        assert "Get".equals(method.getName());
+        getTracker.updateRpc(stats);
+        return;
+      case 1:
+        assert "Mutate".equals(method.getName());
+        final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
+        switch(mutationType) {
+        case APPEND:
+          appendTracker.updateRpc(stats);
+          return;
+        case DELETE:
+          deleteTracker.updateRpc(stats);
+          return;
+        case INCREMENT:
+          incrementTracker.updateRpc(stats);
+          return;
+        case PUT:
+          putTracker.updateRpc(stats);
+          return;
+        default:
+          throw new RuntimeException("Unrecognized mutation type " + mutationType);
+        }
+      case 2:
+        assert "Scan".equals(method.getName());
+        scanTracker.updateRpc(stats);
+        return;
+      case 3:
+        assert "BulkLoadHFile".equals(method.getName());
+        // use generic implementation
+        break;
+      case 4:
+        assert "ExecService".equals(method.getName());
+        // use generic implementation
+        break;
+      case 5:
+        assert "ExecRegionServerService".equals(method.getName());
+        // use generic implementation
+        break;
+      case 6:
+        assert "Multi".equals(method.getName());
+        multiTracker.updateRpc(stats);
+        return;
+      default:
+        throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
+      }
+    }
+    // Fallback to dynamic registry lookup for DDL methods.
+    updateRpcGeneric(method, stats);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 9be370d..6f5e78a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.codec.KeyValueCodec;
 import org.apache.hadoop.hbase.security.User;
@@ -55,6 +56,7 @@ public abstract class AbstractRpcClient implements RpcClient {
   protected final Configuration conf;
   protected String clusterId;
   protected final SocketAddress localAddr;
+  protected final MetricsConnection metrics;
 
   protected UserProvider userProvider;
   protected final IPCUtil ipcUtil;
@@ -79,8 +81,10 @@ public abstract class AbstractRpcClient implements RpcClient {
    * @param conf configuration
    * @param clusterId the cluster id
    * @param localAddr client socket bind address.
+   * @param metrics the connection metrics
    */
-  public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
+  public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
+      MetricsConnection metrics) {
     this.userProvider = UserProvider.instantiate(conf);
     this.localAddr = localAddr;
     this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
@@ -100,6 +104,7 @@ public abstract class AbstractRpcClient implements RpcClient {
     this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
     this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
     this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
+    this.metrics = metrics;
 
     // login the server principal (if using secure Hadoop)
     if (LOG.isDebugEnabled()) {
@@ -205,19 +210,20 @@ public abstract class AbstractRpcClient implements RpcClient {
       pcrc = new PayloadCarryingRpcController();
     }
 
-    long startTime = 0;
-    if (LOG.isTraceEnabled()) {
-      startTime = EnvironmentEdgeManager.currentTime();
-    }
     Pair<Message, CellScanner> val;
     try {
-      val = call(pcrc, md, param, returnType, ticket, isa);
+      final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
+      cs.setStartTime(EnvironmentEdgeManager.currentTime());
+      val = call(pcrc, md, param, returnType, ticket, isa, cs);
       // Shove the results into controller so can be carried across the proxy/pb service void.
       pcrc.setCellScanner(val.getSecond());
 
+      cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
+      if (metrics != null) {
+        metrics.updateRpc(md, param, cs);
+      }
       if (LOG.isTraceEnabled()) {
-        long callTime = EnvironmentEdgeManager.currentTime() - startTime;
-        LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
+        LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
       }
       return val.getFirst();
     } catch (Throwable e) {
@@ -242,7 +248,8 @@ public abstract class AbstractRpcClient implements RpcClient {
    */
   protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
       Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
-      InetSocketAddress isa) throws IOException, InterruptedException;
+      InetSocketAddress isa, MetricsConnection.CallStats callStats)
+      throws IOException, InterruptedException;
 
   @Override
   public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
index 431c669..a5da0dc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.ExceptionUtil;
@@ -49,6 +50,7 @@ public class AsyncCall extends DefaultPromise<Message> {
   final Message responseDefaultType;
   final long startTime;
   final long rpcTimeout;
+  final MetricsConnection.CallStats callStats;
 
   /**
    * Constructor
@@ -61,7 +63,8 @@ public class AsyncCall extends DefaultPromise<Message> {
    * @param responseDefaultType the default response type
    */
   public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message
-      param, PayloadCarryingRpcController controller, Message responseDefaultType) {
+      param, PayloadCarryingRpcController controller, Message responseDefaultType,
+      MetricsConnection.CallStats callStats) {
     super(eventLoop);
 
     this.id = connectId;
@@ -73,6 +76,7 @@ public class AsyncCall extends DefaultPromise<Message> {
 
     this.startTime = EnvironmentEdgeManager.currentTime();
     this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0;
+    this.callStats = callStats;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 43d75f9..44e8322 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -49,6 +49,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
@@ -310,10 +311,10 @@ public class AsyncRpcChannel {
    */
   public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
       final PayloadCarryingRpcController controller, final Message request,
-      final Message responsePrototype) {
+      final Message responsePrototype, MetricsConnection.CallStats callStats) {
     final AsyncCall call =
         new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request,
-            controller, responsePrototype);
+            controller, responsePrototype, callStats);
     controller.notifyOnCancel(new RpcCallback<Object>() {
       @Override
       public void run(Object parameter) {
@@ -433,7 +434,7 @@ public class AsyncRpcChannel {
 
       ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
       try(ByteBufOutputStream out = new ByteBufOutputStream(b)) {
-        IPCUtil.write(out, rh, call.param, cellBlock);
+        call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
       }
 
       channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
@@ -579,8 +580,6 @@ public class AsyncRpcChannel {
 
   /**
    * Clean up calls.
-   *
-   * @param cleanAll true if all calls should be cleaned, false for only the timed out calls
    */
   private void cleanupCalls() {
     List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
index e1662f3..60e9add 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
@@ -52,7 +52,9 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVM;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.hbase.util.PoolMap;
@@ -146,12 +148,13 @@ public class AsyncRpcClient extends AbstractRpcClient {
    * @param configuration      to HBase
    * @param clusterId          for the cluster
    * @param localAddress       local address to connect to
+   * @param metrics            the connection metrics
    * @param channelInitializer for custom channel handlers
    */
-  @VisibleForTesting
-  AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
+  protected AsyncRpcClient(Configuration configuration, String clusterId,
+      SocketAddress localAddress, MetricsConnection metrics,
       ChannelInitializer<SocketChannel> channelInitializer) {
-    super(configuration, clusterId, localAddress);
+    super(configuration, clusterId, localAddress, metrics);
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Starting async Hbase RPC client");
@@ -191,15 +194,28 @@ public class AsyncRpcClient extends AbstractRpcClient {
     }
   }
 
+  /** Used in test only. */
+  AsyncRpcClient(Configuration configuration) {
+    this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
+  }
+
+  /** Used in test only. */
+  AsyncRpcClient(Configuration configuration,
+      ChannelInitializer<SocketChannel> channelInitializer) {
+    this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, channelInitializer);
+  }
+
   /**
    * Constructor
    *
    * @param configuration to HBase
    * @param clusterId     for the cluster
    * @param localAddress  local address to connect to
+   * @param metrics       the connection metrics
    */
-  public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress) {
-    this(configuration, clusterId, localAddress, null);
+  public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
+      MetricsConnection metrics) {
+    this(configuration, clusterId, localAddress, metrics, null);
   }
 
   /**
@@ -219,13 +235,14 @@ public class AsyncRpcClient extends AbstractRpcClient {
   @Override
   protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
       Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
-      InetSocketAddress addr) throws IOException, InterruptedException {
+      InetSocketAddress addr, MetricsConnection.CallStats callStats)
+      throws IOException, InterruptedException {
     if (pcrc == null) {
       pcrc = new PayloadCarryingRpcController();
     }
     final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
 
-    Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType);
+    Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType, callStats);
     long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
     try {
       Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
@@ -244,40 +261,49 @@ public class AsyncRpcClient extends AbstractRpcClient {
   /**
    * Call method async
    */
-  private void callMethod(Descriptors.MethodDescriptor md, final PayloadCarryingRpcController pcrc,
-      Message param, Message returnType, User ticket, InetSocketAddress addr,
-      final RpcCallback<Message> done) {
+  private void callMethod(final Descriptors.MethodDescriptor md,
+      final PayloadCarryingRpcController pcrc, final Message param, Message returnType, User ticket,
+      InetSocketAddress addr, final RpcCallback<Message> done) {
     final AsyncRpcChannel connection;
     try {
       connection = createRpcChannel(md.getService().getName(), addr, ticket);
-
-      connection.callMethod(md, pcrc, param, returnType).addListener(
+      final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
+      GenericFutureListener<Future<Message>> listener =
           new GenericFutureListener<Future<Message>>() {
-        @Override
-        public void operationComplete(Future<Message> future) throws Exception {
-          if(!future.isSuccess()){
-            Throwable cause = future.cause();
-            if (cause instanceof IOException) {
-              pcrc.setFailed((IOException) cause);
-            }else{
-              pcrc.setFailed(new IOException(cause));
-            }
-          }else{
-            try {
-              done.run(future.get());
-            }catch (ExecutionException e){
-              Throwable cause = e.getCause();
-              if (cause instanceof IOException) {
-                pcrc.setFailed((IOException) cause);
-              }else{
-                pcrc.setFailed(new IOException(cause));
+            @Override
+            public void operationComplete(Future<Message> future) throws Exception {
+              cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
+              if (metrics != null) {
+                metrics.updateRpc(md, param, cs);
+              }
+              if (LOG.isTraceEnabled()) {
+                LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
+              }
+              if (!future.isSuccess()) {
+                Throwable cause = future.cause();
+                if (cause instanceof IOException) {
+                  pcrc.setFailed((IOException) cause);
+                } else {
+                  pcrc.setFailed(new IOException(cause));
+                }
+              } else {
+                try {
+                  done.run(future.get());
+                } catch (ExecutionException e) {
+                  Throwable cause = e.getCause();
+                  if (cause instanceof IOException) {
+                    pcrc.setFailed((IOException) cause);
+                  } else {
+                    pcrc.setFailed(new IOException(cause));
+                  }
+                } catch (InterruptedException e) {
+                  pcrc.setFailed(new IOException(e));
+                }
               }
-            }catch (InterruptedException e){
-              pcrc.setFailed(new IOException(e));
             }
-          }
-        }
-      });
+          };
+      cs.setStartTime(EnvironmentEdgeManager.currentTime());
+      connection.callMethod(md, pcrc, param, returnType, cs).addListener(listener);
     } catch (StoppedRpcClientException|FailedServerException e) {
       pcrc.setFailed(e);
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
index f7aa8a9..8f6c85b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
@@ -24,8 +24,6 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -39,8 +37,6 @@ import com.google.protobuf.Message;
  */
 @InterfaceAudience.Private
 public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
-  private static final Log LOG = LogFactory.getLog(AsyncServerResponseHandler.class.getName());
-
   private final AsyncRpcChannel channel;
 
   /**
@@ -102,6 +98,7 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
           cellBlockScanner = channel.client.createCellScanner(cellBlock);
         }
         call.setSuccess(value, cellBlockScanner);
+        call.callStats.setResponseSizeBytes(totalSize);
       }
     } catch (IOException e) {
       // Treat this as a fatal condition and close this connection

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
index df32730..5f90837 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
@@ -21,6 +21,7 @@ import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
 import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 
@@ -41,16 +42,18 @@ public class Call {
   Message responseDefaultType;
   IOException error;                            // exception, null if value
   volatile boolean done;                                 // true when call is done
-  long startTime;
   final Descriptors.MethodDescriptor md;
   final int timeout; // timeout in millisecond for this call; 0 means infinite.
+  final MetricsConnection.CallStats callStats;
 
   protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
-      final CellScanner cells, final Message responseDefaultType, int timeout) {
+      final CellScanner cells, final Message responseDefaultType, int timeout,
+      MetricsConnection.CallStats callStats) {
     this.param = param;
     this.md = md;
     this.cells = cells;
-    this.startTime = EnvironmentEdgeManager.currentTime();
+    this.callStats = callStats;
+    this.callStats.setStartTime(EnvironmentEdgeManager.currentTime());
     this.responseDefaultType = responseDefaultType;
     this.id = id;
     this.timeout = timeout;
@@ -122,6 +125,6 @@ public class Call {
   }
 
   public long getStartTime() {
-    return this.startTime;
+    return this.callStats.getStartTime();
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
index 10ddc56..822daca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
@@ -17,8 +17,10 @@
  */
 package org.apache.hadoop.hbase.ipc;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.util.ReflectionUtils;
 
 import java.net.SocketAddress;
@@ -37,15 +39,23 @@ public final class RpcClientFactory {
   private RpcClientFactory() {
   }
 
+  /** Helper method for tests only. Creates an {@code RpcClient} without metrics. */
+  @VisibleForTesting
+  public static RpcClient createClient(Configuration conf, String clusterId) {
+    return createClient(conf, clusterId, null);
+  }
+
   /**
    * Creates a new RpcClient by the class defined in the configuration or falls back to
    * RpcClientImpl
    * @param conf configuration
    * @param clusterId the cluster id
+   * @param metrics the connection metrics
    * @return newly created RpcClient
    */
-  public static RpcClient createClient(Configuration conf, String clusterId) {
-    return createClient(conf, clusterId, null);
+  public static RpcClient createClient(Configuration conf, String clusterId,
+      MetricsConnection metrics) {
+    return createClient(conf, clusterId, null, metrics);
   }
 
   /**
@@ -54,16 +64,18 @@ public final class RpcClientFactory {
    * @param conf configuration
    * @param clusterId the cluster id
    * @param localAddr client socket bind address.
+   * @param metrics the connection metrics
    * @return newly created RpcClient
    */
   public static RpcClient createClient(Configuration conf, String clusterId,
-      SocketAddress localAddr) {
+      SocketAddress localAddr, MetricsConnection metrics) {
     String rpcClientClass =
         conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, AsyncRpcClient.class.getName());
     return ReflectionUtils.instantiateWithCustomCtor(
         rpcClientClass,
-        new Class[] { Configuration.class, String.class, SocketAddress.class },
-        new Object[] { conf, clusterId, localAddr }
+        new Class[] { Configuration.class, String.class, SocketAddress.class,
+            MetricsConnection.class },
+        new Object[] { conf, clusterId, localAddr, metrics }
     );
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index cb18952..3fb7061 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -19,6 +19,7 @@
 
 package org.apache.hadoop.hbase.ipc;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.protobuf.Descriptors.MethodDescriptor;
 import com.google.protobuf.Message;
 import com.google.protobuf.Message.Builder;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.codec.Codec;
 import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
 import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -911,7 +913,8 @@ public class RpcClientImpl extends AbstractRpcClient {
         checkIsOpen(); // Now we're checking that it didn't became idle in between.
 
         try {
-          IPCUtil.write(this.out, header, call.param, cellBlock);
+          call.callStats.setRequestSizeBytes(
+              IPCUtil.write(this.out, header, call.param, cellBlock));
         } catch (IOException e) {
           // We set the value inside the synchronized block, this way the next in line
           //  won't even try to write. Otherwise we might miss a call in the calls map?
@@ -964,12 +967,20 @@ public class RpcClientImpl extends AbstractRpcClient {
           int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
           int whatIsLeftToRead = totalSize - readSoFar;
           IOUtils.skipFully(in, whatIsLeftToRead);
+          if (call != null) {
+            call.callStats.setResponseSizeBytes(totalSize);
+            call.callStats.setCallTimeMs(
+                EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
+          }
           return;
         }
         if (responseHeader.hasException()) {
           ExceptionResponse exceptionResponse = responseHeader.getException();
           RemoteException re = createRemoteException(exceptionResponse);
           call.setException(re);
+          call.callStats.setResponseSizeBytes(totalSize);
+          call.callStats.setCallTimeMs(
+              EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
           if (isFatalConnectionException(exceptionResponse)) {
             markClosed(re);
           }
@@ -988,6 +999,9 @@ public class RpcClientImpl extends AbstractRpcClient {
             cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
           }
           call.setResponse(value, cellBlockScanner);
+          call.callStats.setResponseSizeBytes(totalSize);
+          call.callStats.setCallTimeMs(
+              EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
         }
       } catch (IOException e) {
         if (expectedCall) call.setException(e);
@@ -1075,13 +1089,15 @@ public class RpcClientImpl extends AbstractRpcClient {
   }
 
   /**
-   * Construct an IPC cluster client whose values are of the {@link Message} class.
+   * Used in test only. Construct an IPC cluster client whose values are of the
+   * {@link Message} class.
    * @param conf configuration
    * @param clusterId the cluster id
    * @param factory socket factory
    */
+  @VisibleForTesting
   RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) {
-    this(conf, clusterId, factory, null);
+    this(conf, clusterId, factory, null, null);
   }
 
   /**
@@ -1090,10 +1106,11 @@ public class RpcClientImpl extends AbstractRpcClient {
    * @param clusterId the cluster id
    * @param factory socket factory
    * @param localAddr client socket bind address
+   * @param metrics the connection metrics
    */
   RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory,
-      SocketAddress localAddr) {
-    super(conf, clusterId, localAddr);
+      SocketAddress localAddr, MetricsConnection metrics) {
+    super(conf, clusterId, localAddr, metrics);
 
     this.socketFactory = factory;
     this.connections = new PoolMap<ConnectionId, Connection>(getPoolType(conf), getPoolSize(conf));
@@ -1101,25 +1118,27 @@ public class RpcClientImpl extends AbstractRpcClient {
   }
 
   /**
-   * Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory
-   * @param conf configuration
-   * @param clusterId the cluster id
+   * Used in test only. Construct an IPC client for the cluster {@code clusterId} with
+   * the default SocketFactory
    */
-  public RpcClientImpl(Configuration conf, String clusterId) {
-    this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null);
+  @VisibleForTesting
+  RpcClientImpl(Configuration conf, String clusterId) {
+    this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null, null);
   }
 
   /**
-   * Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory
+   * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory
    *
    * This method is called with reflection by the RpcClientFactory to create an instance
    *
    * @param conf configuration
    * @param clusterId the cluster id
    * @param localAddr client socket bind address.
+   * @param metrics the connection metrics
    */
-  public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr) {
-    this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr);
+  public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr,
+      MetricsConnection metrics) {
+    this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr, metrics);
   }
 
   /** Stop all threads related to this client.  No further calls may be made
@@ -1182,7 +1201,8 @@ public class RpcClientImpl extends AbstractRpcClient {
    */
   @Override
   protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
-      Message param, Message returnType, User ticket, InetSocketAddress addr)
+      Message param, Message returnType, User ticket, InetSocketAddress addr,
+      MetricsConnection.CallStats callStats)
       throws IOException, InterruptedException {
     if (pcrc == null) {
       pcrc = new PayloadCarryingRpcController();
@@ -1190,7 +1210,7 @@ public class RpcClientImpl extends AbstractRpcClient {
     CellScanner cells = pcrc.cellScanner();
 
     final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,
-        pcrc.getCallTimeout());
+        pcrc.getCallTimeout(), MetricsConnection.newCallStats());
 
     final Connection connection = getConnection(ticket, call, addr);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
new file mode 100644
index 0000000..88a653e
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MetricsTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+
+@Category({ClientTests.class, MetricsTests.class, SmallTests.class})
+public class TestMetricsConnection {
+
+  private static MetricsConnection METRICS;
+
+  @BeforeClass
+  public static void beforeClass() {
+    ConnectionImplementation mocked = Mockito.mock(ConnectionImplementation.class);
+    Mockito.when(mocked.toString()).thenReturn("mocked-connection");
+    METRICS = new MetricsConnection(Mockito.mock(ConnectionImplementation.class));
+  }
+
+  @AfterClass
+  public static void afterClass() {
+    METRICS.shutdown();
+  }
+
+  @Test
+  public void testStaticMetrics() throws IOException {
+    final byte[] foo = Bytes.toBytes("foo");
+    final RegionSpecifier region = RegionSpecifier.newBuilder()
+        .setValue(ByteString.EMPTY)
+        .setType(RegionSpecifierType.REGION_NAME)
+        .build();
+    final int loop = 5;
+
+    for (int i = 0; i < loop; i++) {
+      METRICS.updateRpc(
+          ClientService.getDescriptor().findMethodByName("Get"),
+          GetRequest.getDefaultInstance(),
+          MetricsConnection.newCallStats());
+      METRICS.updateRpc(
+          ClientService.getDescriptor().findMethodByName("Scan"),
+          ScanRequest.getDefaultInstance(),
+          MetricsConnection.newCallStats());
+      METRICS.updateRpc(
+          ClientService.getDescriptor().findMethodByName("Multi"),
+          MultiRequest.getDefaultInstance(),
+          MetricsConnection.newCallStats());
+      METRICS.updateRpc(
+          ClientService.getDescriptor().findMethodByName("Mutate"),
+          MutateRequest.newBuilder()
+              .setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new Append(foo)))
+              .setRegion(region)
+              .build(),
+          MetricsConnection.newCallStats());
+      METRICS.updateRpc(
+          ClientService.getDescriptor().findMethodByName("Mutate"),
+          MutateRequest.newBuilder()
+              .setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new Delete(foo)))
+              .setRegion(region)
+              .build(),
+          MetricsConnection.newCallStats());
+      METRICS.updateRpc(
+          ClientService.getDescriptor().findMethodByName("Mutate"),
+          MutateRequest.newBuilder()
+              .setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new Increment(foo)))
+              .setRegion(region)
+              .build(),
+          MetricsConnection.newCallStats());
+      METRICS.updateRpc(
+          ClientService.getDescriptor().findMethodByName("Mutate"),
+          MutateRequest.newBuilder()
+              .setMutation(ProtobufUtil.toMutation(MutationType.PUT, new Put(foo)))
+              .setRegion(region)
+              .build(),
+          MetricsConnection.newCallStats());
+    }
+    for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] {
+        METRICS.getTracker, METRICS.scanTracker, METRICS.multiTracker, METRICS.appendTracker,
+        METRICS.deleteTracker, METRICS.incrementTracker, METRICS.putTracker
+    }) {
+      Assert.assertEquals("Failed to invoke callTimer on " + t, loop, t.callTimer.count());
+      Assert.assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.count());
+      Assert.assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.count());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 7653fa1..b799837 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -851,7 +851,7 @@ public class HRegionServer extends HasThread implements
 
     // Setup RPC client for master communication
     rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
-        rpcServices.isa.getAddress(), 0));
+        rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
 
     boolean onlyMetaRefresh = false;
     int storefileRefreshPeriod = conf.getInt(

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
index b2cb772..91f494a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.regionserver;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
 import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
@@ -48,7 +49,7 @@ public class MetricsRegionServer {
     this.serverSource = serverSource;
   }
 
-  // for unit-test usage
+  @VisibleForTesting
   public MetricsRegionServerSource getMetricsSource() {
     return serverSource;
   }

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
index 418a0ac..8edfd9e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
@@ -134,9 +134,10 @@ public class TestClientTimeouts {
   /**
    * Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
    */
-  public static class RandomTimeoutRpcClient extends RpcClientImpl{
-    public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
-      super(conf, clusterId, localAddr);
+  public static class RandomTimeoutRpcClient extends RpcClientImpl {
+    public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
+        MetricsConnection metrics) {
+      super(conf, clusterId, localAddr, metrics);
     }
 
     // Return my own instance, one that does random timeouts

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 32eb9f6..d427419 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.CellScanner;
 import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@@ -163,7 +164,8 @@ public abstract class AbstractTestIPC {
       final String message = "hello";
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
       Pair<Message, CellScanner> r =
-          client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
+          client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
+              new MetricsConnection.CallStats());
       assertTrue(r.getSecond() == null);
       // Silly assertion that the message is in the returned pb.
       assertTrue(r.getFirst().toString().contains(message));
@@ -205,7 +207,8 @@ public abstract class AbstractTestIPC {
       PayloadCarryingRpcController pcrc =
           new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
       Pair<Message, CellScanner> r =
-          client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
+          client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
+              new MetricsConnection.CallStats());
       int index = 0;
       while (r.getSecond().advance()) {
         assertTrue(CELL.equals(r.getSecond().current()));
@@ -231,7 +234,8 @@ public abstract class AbstractTestIPC {
       InetSocketAddress address = rpcServer.getListenerAddress();
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-      client.call(null, md, param, null, User.getCurrent(), address);
+      client.call(null, md, param, null, User.getCurrent(), address,
+          new MetricsConnection.CallStats());
       fail("Expected an exception to have been thrown!");
     } catch (Exception e) {
       LOG.info("Caught expected exception: " + e.toString());
@@ -255,10 +259,10 @@ public abstract class AbstractTestIPC {
       MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
       EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
       for (int i = 0; i < 10; i++) {
-        client.call(
-          new PayloadCarryingRpcController(
-              CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param, md
-              .getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress());
+        client.call(new PayloadCarryingRpcController(
+            CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
+            md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(),
+            new MetricsConnection.CallStats());
       }
       verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
     } finally {

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
index 18e3798..d761ae9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.codec.Codec;
@@ -116,7 +117,7 @@ public class TestAsyncIPC extends AbstractTestIPC {
   @Override
   protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) {
     setConf(conf);
-    return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null) {
+    return new AsyncRpcClient(conf) {
 
       @Override
       Codec getCodec() {
@@ -129,15 +130,13 @@ public class TestAsyncIPC extends AbstractTestIPC {
   @Override
   protected AsyncRpcClient createRpcClient(Configuration conf) {
     setConf(conf);
-    return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
+    return new AsyncRpcClient(conf);
   }
 
   @Override
   protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
     setConf(conf);
-    return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null,
-        new ChannelInitializer<SocketChannel>() {
-
+    return new AsyncRpcClient(conf, new ChannelInitializer<SocketChannel>() {
           @Override
           protected void initChannel(SocketChannel ch) throws Exception {
             ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@@ -248,7 +247,7 @@ public class TestAsyncIPC extends AbstractTestIPC {
     TestRpcServer rpcServer = new TestRpcServer();
     MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
     EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
-    AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
+    AsyncRpcClient client = new AsyncRpcClient(conf);
     KeyValue kv = BIG_CELL;
     Put p = new Put(CellUtil.cloneRow(kv));
     for (int i = 0; i < cellcount; i++) {
@@ -282,7 +281,8 @@ public class TestAsyncIPC extends AbstractTestIPC {
         PayloadCarryingRpcController pcrc =
             new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
         // Pair<Message, CellScanner> response =
-        client.call(pcrc, md, builder.build(), param, user, address);
+        client.call(pcrc, md, builder.build(), param, user, address,
+            new MetricsConnection.CallStats());
         /*
          * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
          * count);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java
index 60dbd1b..52c77e7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java
@@ -37,15 +37,15 @@ public class TestGlobalEventLoopGroup {
   public void test() {
     Configuration conf = HBaseConfiguration.create();
     conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, true);
-    AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
+    AsyncRpcClient client = new AsyncRpcClient(conf);
     assertNotNull(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP);
-    AsyncRpcClient client1 = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
+    AsyncRpcClient client1 = new AsyncRpcClient(conf);
     assertSame(client.bootstrap.group(), client1.bootstrap.group());
     client1.close();
     assertFalse(client.bootstrap.group().isShuttingDown());
 
     conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, false);
-    AsyncRpcClient client2 = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
+    AsyncRpcClient client2 = new AsyncRpcClient(conf);
     assertNotSame(client.bootstrap.group(), client2.bootstrap.group());
     client2.close();
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
index 67e4e4f..d1b8202 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RowMutations;
 import org.apache.hadoop.hbase.codec.Codec;
@@ -145,7 +146,8 @@ public class TestIPC extends AbstractTestIPC {
         PayloadCarryingRpcController pcrc =
             new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
         // Pair<Message, CellScanner> response =
-        client.call(pcrc, md, builder.build(), param, user, address);
+        client.call(pcrc, md, builder.build(), param, user, address,
+            new MetricsConnection.CallStats());
         /*
          * int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
          * count);

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
index 2965055..596b8ab 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.client.Connection;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.apache.hadoop.hbase.client.Table;
 import org.apache.hadoop.hbase.codec.Codec;
@@ -58,8 +59,9 @@ public class TestRpcClientLeaks {
       super(conf, clusterId);
     }
 
-    public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address) {
-      super(conf, clusterId, address);
+    public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address,
+        MetricsConnection metrics) {
+      super(conf, clusterId, address, metrics);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
index 8a6b092..a4e55d9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.MetricsConnection;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
 import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@@ -181,7 +182,7 @@ public class TestRpcHandlerException {
           new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
       
       client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
-          rpcServer.getListenerAddress());
+          rpcServer.getListenerAddress(), new MetricsConnection.CallStats());
     } catch (Throwable e) {
       assert(abortable.isAborted() == true);
     } finally {