You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by nd...@apache.org on 2015/10/08 02:05:41 UTC
hbase git commit: HBASE-12911 Client-side metrics
Repository: hbase
Updated Branches:
refs/heads/master e1fd3526b -> 7e30436e3
HBASE-12911 Client-side metrics
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7e30436e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7e30436e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7e30436e
Branch: refs/heads/master
Commit: 7e30436e3fa84525b85b05b9e23cb01b2ada7c12
Parents: e1fd352
Author: Nick Dimiduk <nd...@apache.org>
Authored: Mon Oct 5 10:19:40 2015 -0700
Committer: Nick Dimiduk <nd...@apache.org>
Committed: Wed Oct 7 17:01:56 2015 -0700
----------------------------------------------------------------------
hbase-client/pom.xml | 4 +
.../hadoop/hbase/client/ClusterConnection.java | 5 +
.../apache/hadoop/hbase/client/Connection.java | 1 -
.../hbase/client/ConnectionImplementation.java | 27 +-
.../apache/hadoop/hbase/client/MetaCache.java | 9 +
.../hadoop/hbase/client/MetricsConnection.java | 324 +++++++++++++++++++
.../hadoop/hbase/ipc/AbstractRpcClient.java | 25 +-
.../org/apache/hadoop/hbase/ipc/AsyncCall.java | 6 +-
.../hadoop/hbase/ipc/AsyncRpcChannel.java | 9 +-
.../apache/hadoop/hbase/ipc/AsyncRpcClient.java | 96 ++++--
.../hbase/ipc/AsyncServerResponseHandler.java | 5 +-
.../java/org/apache/hadoop/hbase/ipc/Call.java | 11 +-
.../hadoop/hbase/ipc/RpcClientFactory.java | 22 +-
.../apache/hadoop/hbase/ipc/RpcClientImpl.java | 50 ++-
.../hbase/client/TestMetricsConnection.java | 120 +++++++
.../hbase/regionserver/HRegionServer.java | 2 +-
.../hbase/regionserver/MetricsRegionServer.java | 3 +-
.../hadoop/hbase/client/TestClientTimeouts.java | 7 +-
.../hadoop/hbase/ipc/AbstractTestIPC.java | 18 +-
.../apache/hadoop/hbase/ipc/TestAsyncIPC.java | 14 +-
.../hbase/ipc/TestGlobalEventLoopGroup.java | 6 +-
.../org/apache/hadoop/hbase/ipc/TestIPC.java | 4 +-
.../hadoop/hbase/ipc/TestRpcClientLeaks.java | 6 +-
.../hbase/ipc/TestRpcHandlerException.java | 3 +-
24 files changed, 667 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/pom.xml
----------------------------------------------------------------------
diff --git a/hbase-client/pom.xml b/hbase-client/pom.xml
index 425bd05..401e28e 100644
--- a/hbase-client/pom.xml
+++ b/hbase-client/pom.xml
@@ -189,6 +189,10 @@
<artifactId>log4j</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.yammer.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ </dependency>
</dependencies>
<profiles>
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
index b3d99ae..99071fa 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnection.java
@@ -297,4 +297,9 @@ public interface ClusterConnection extends HConnection {
*/
ClientBackoffPolicy getBackoffPolicy();
+ /**
+ * @return the MetricsConnection instance associated with this connection.
+ */
+ public MetricsConnection getConnectionMetrics();
+
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
index dab4905..a3f6fe6 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/Connection.java
@@ -174,5 +174,4 @@ public interface Connection extends Abortable, Closeable {
* @return true if this connection is closed
*/
boolean isClosed();
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
index fbb77dc..9f03184 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionImplementation.java
@@ -18,6 +18,8 @@
*/
package org.apache.hadoop.hbase.client;
+import static org.apache.hadoop.hbase.client.MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY;
+
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingRpcChannel;
import com.google.protobuf.RpcController;
@@ -165,11 +167,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
// Client rpc instance.
private RpcClient rpcClient;
- private MetaCache metaCache = new MetaCache();
+ private final MetaCache metaCache;
+ private final MetricsConnection metrics;
private int refCount;
- private User user;
+ protected User user;
private RpcRetryingCallerFactory rpcCallerFactory;
@@ -198,11 +201,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
- HConstants.DEFAULT_USE_META_REPLICAS);
+ HConstants.DEFAULT_USE_META_REPLICAS);
this.numTries = tableConfig.getRetriesNumber();
this.rpcTimeout = conf.getInt(
- HConstants.HBASE_RPC_TIMEOUT_KEY,
- HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
+ HConstants.HBASE_RPC_TIMEOUT_KEY,
+ HConstants.DEFAULT_HBASE_RPC_TIMEOUT);
if (conf.getBoolean(CLIENT_NONCES_ENABLED_KEY, true)) {
synchronized (nonceGeneratorCreateLock) {
if (nonceGenerator == null) {
@@ -219,6 +222,12 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
this.rpcCallerFactory = RpcRetryingCallerFactory.instantiate(conf, interceptor, this.stats);
this.backoffPolicy = ClientBackoffPolicyFactory.create(conf);
this.asyncProcess = createAsyncProcess(this.conf);
+ if (conf.getBoolean(CLIENT_SIDE_METRICS_ENABLED_KEY, false)) {
+ this.metrics = new MetricsConnection(this);
+ } else {
+ this.metrics = null;
+ }
+ this.metaCache = new MetaCache(this.metrics);
boolean shouldListen = conf.getBoolean(HConstants.STATUS_PUBLISHED,
HConstants.STATUS_PUBLISHED_DEFAULT);
@@ -377,6 +386,11 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
return new HBaseAdmin(this);
}
+ @Override
+ public MetricsConnection getConnectionMetrics() {
+ return this.metrics;
+ }
+
private ExecutorService getBatchPool() {
if (batchPool == null) {
synchronized (this) {
@@ -2140,6 +2154,9 @@ class ConnectionImplementation implements ClusterConnection, Closeable {
}
closeMaster();
shutdownPools();
+ if (this.metrics != null) {
+ this.metrics.shutdown();
+ }
this.closed = true;
closeZooKeeperWatcher();
this.stubs.clear();
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
index 8e1c93c..b23ca70 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetaCache.java
@@ -59,6 +59,12 @@ public class MetaCache {
// The access to this attribute must be protected by a lock on cachedRegionLocations
private final Set<ServerName> cachedServers = new ConcurrentSkipListSet<ServerName>();
+ private final MetricsConnection metrics;
+
+ public MetaCache(MetricsConnection metrics) {
+ this.metrics = metrics;
+ }
+
/**
* Search the cache for a location that fits our table and row key.
* Return null if no suitable region is located.
@@ -74,6 +80,7 @@ public class MetaCache {
Entry<byte[], RegionLocations> e = tableLocations.floorEntry(row);
if (e == null) {
+ if (metrics != null) metrics.incrMetaCacheMiss();
return null;
}
RegionLocations possibleRegion = e.getValue();
@@ -94,10 +101,12 @@ public class MetaCache {
// HConstants.EMPTY_END_ROW) check itself will pass.
if (Bytes.equals(endKey, HConstants.EMPTY_END_ROW) ||
Bytes.compareTo(endKey, 0, endKey.length, row, 0, row.length) > 0) {
+ if (metrics != null) metrics.incrMetaCacheHit();
return possibleRegion;
}
// Passed all the way through, so we got nothing - complete cache miss
+ if (metrics != null) metrics.incrMetaCacheMiss();
return null;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
new file mode 100644
index 0000000..f34fb8a
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -0,0 +1,324 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.protobuf.Descriptors.MethodDescriptor;
+import com.google.protobuf.Message;
+import com.yammer.metrics.core.Counter;
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.MetricsRegistry;
+import com.yammer.metrics.core.Timer;
+import com.yammer.metrics.reporting.JmxReporter;
+import com.yammer.metrics.util.RatioGauge;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * This class is for maintaining the various connection statistics and publishing them through
+ * the metrics interfaces.
+ *
+ * This class manages its own {@link MetricsRegistry} and {@link JmxReporter} so as to not
+ * conflict with other uses of Yammer Metrics within the client application. Instantiating
+ * this class implicitly creates and "starts" instances of these classes; be sure to call
+ * {@link #shutdown()} to terminate the thread pools they allocate.
+ */
+@InterfaceAudience.Private
+public class MetricsConnection {
+
+ /** Set this key to {@code true} to enable metrics collection of client requests. */
+ public static final String CLIENT_SIDE_METRICS_ENABLED_KEY = "hbase.client.metrics.enable";
+
+ private static final String DRTN_BASE = "rpcCallDurationMs_";
+ private static final String REQ_BASE = "rpcCallRequestSizeBytes_";
+ private static final String RESP_BASE = "rpcCallResponseSizeBytes_";
+ private static final String CLIENT_SVC = ClientService.getDescriptor().getName();
+
+ /** A container class for collecting details about the RPC call as it percolates. */
+ public static class CallStats {
+ private long requestSizeBytes = 0;
+ private long responseSizeBytes = 0;
+ private long startTime = 0;
+ private long callTimeMs = 0;
+
+ public long getRequestSizeBytes() {
+ return requestSizeBytes;
+ }
+
+ public void setRequestSizeBytes(long requestSizeBytes) {
+ this.requestSizeBytes = requestSizeBytes;
+ }
+
+ public long getResponseSizeBytes() {
+ return responseSizeBytes;
+ }
+
+ public void setResponseSizeBytes(long responseSizeBytes) {
+ this.responseSizeBytes = responseSizeBytes;
+ }
+
+ public long getStartTime() {
+ return startTime;
+ }
+
+ public void setStartTime(long startTime) {
+ this.startTime = startTime;
+ }
+
+ public long getCallTimeMs() {
+ return callTimeMs;
+ }
+
+ public void setCallTimeMs(long callTimeMs) {
+ this.callTimeMs = callTimeMs;
+ }
+ }
+
+ @VisibleForTesting
+ protected final class CallTracker {
+ private final String name;
+ @VisibleForTesting final Timer callTimer;
+ @VisibleForTesting final Histogram reqHist;
+ @VisibleForTesting final Histogram respHist;
+
+ private CallTracker(MetricsRegistry registry, String name, String subName, String scope) {
+ StringBuilder sb = new StringBuilder(CLIENT_SVC).append("_").append(name);
+ if (subName != null) {
+ sb.append("(").append(subName).append(")");
+ }
+ this.name = sb.toString();
+ this.callTimer = registry.newTimer(MetricsConnection.class, DRTN_BASE + this.name, scope);
+ this.reqHist = registry.newHistogram(MetricsConnection.class, REQ_BASE + this.name, scope);
+ this.respHist = registry.newHistogram(MetricsConnection.class, RESP_BASE + this.name, scope);
+ }
+
+ private CallTracker(MetricsRegistry registry, String name, String scope) {
+ this(registry, name, null, scope);
+ }
+
+ public void updateRpc(CallStats stats) {
+ this.callTimer.update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
+ this.reqHist.update(stats.getRequestSizeBytes());
+ this.respHist.update(stats.getResponseSizeBytes());
+ }
+
+ @Override
+ public String toString() {
+ return "CallTracker:" + name;
+ }
+ }
+
+ /** A lambda for dispatching to the appropriate metric factory method */
+ private static interface NewMetric<T> {
+ T newMetric(Class<?> clazz, String name, String scope);
+ }
+
+ /** Anticipated number of metric entries */
+ private static final int CAPACITY = 50;
+ /** Default load factor from {@link java.util.HashMap#DEFAULT_LOAD_FACTOR} */
+ private static final float LOAD_FACTOR = 0.75f;
+ /**
+ * Anticipated number of concurrent accessor threads, from
+ * {@link ConnectionImplementation#getBatchPool()}
+ */
+ private static final int CONCURRENCY_LEVEL = 256;
+
+ private final MetricsRegistry registry;
+ private final JmxReporter reporter;
+ private final String scope;
+
+ private final NewMetric<Timer> timerFactory = new NewMetric<Timer>() {
+ @Override public Timer newMetric(Class<?> clazz, String name, String scope) {
+ return registry.newTimer(clazz, name, scope);
+ }
+ };
+
+ private final NewMetric<Histogram> histogramFactory = new NewMetric<Histogram>() {
+ @Override public Histogram newMetric(Class<?> clazz, String name, String scope) {
+ return registry.newHistogram(clazz, name, scope);
+ }
+ };
+
+ // static metrics
+
+ @VisibleForTesting protected final Counter metaCacheHits;
+ @VisibleForTesting protected final Counter metaCacheMisses;
+ @VisibleForTesting protected final CallTracker getTracker;
+ @VisibleForTesting protected final CallTracker scanTracker;
+ @VisibleForTesting protected final CallTracker appendTracker;
+ @VisibleForTesting protected final CallTracker deleteTracker;
+ @VisibleForTesting protected final CallTracker incrementTracker;
+ @VisibleForTesting protected final CallTracker putTracker;
+ @VisibleForTesting protected final CallTracker multiTracker;
+
+ // dynamic metrics
+
+ // These maps are used to cache references to the metric instances that are managed by the
+ // registry. I don't think their use perfectly removes redundant allocations, but it's
+ // a big improvement over calling registry.newMetric each time.
+ @VisibleForTesting protected final ConcurrentMap<String, Timer> rpcTimers =
+ new ConcurrentHashMap<>(CAPACITY, LOAD_FACTOR, CONCURRENCY_LEVEL);
+ @VisibleForTesting protected final ConcurrentMap<String, Histogram> rpcHistograms =
+ new ConcurrentHashMap<>(CAPACITY * 2 /* tracking both request and response sizes */,
+ LOAD_FACTOR, CONCURRENCY_LEVEL);
+
+ public MetricsConnection(final ConnectionImplementation conn) {
+ this.scope = conn.toString();
+ this.registry = new MetricsRegistry();
+ final ThreadPoolExecutor batchPool = (ThreadPoolExecutor) conn.getCurrentBatchPool();
+ final ThreadPoolExecutor metaPool = (ThreadPoolExecutor) conn.getCurrentMetaLookupPool();
+
+ this.registry.newGauge(this.getClass(), "executorPoolActiveThreads", scope,
+ new RatioGauge() {
+ @Override protected double getNumerator() {
+ return batchPool.getActiveCount();
+ }
+ @Override protected double getDenominator() {
+ return batchPool.getMaximumPoolSize();
+ }
+ });
+ this.registry.newGauge(this.getClass(), "metaPoolActiveThreads", scope,
+ new RatioGauge() {
+ @Override protected double getNumerator() {
+ return metaPool.getActiveCount();
+ }
+ @Override protected double getDenominator() {
+ return metaPool.getMaximumPoolSize();
+ }
+ });
+ this.metaCacheHits = registry.newCounter(this.getClass(), "metaCacheHits", scope);
+ this.metaCacheMisses = registry.newCounter(this.getClass(), "metaCacheMisses", scope);
+ this.getTracker = new CallTracker(this.registry, "Get", scope);
+ this.scanTracker = new CallTracker(this.registry, "Scan", scope);
+ this.appendTracker = new CallTracker(this.registry, "Mutate", "Append", scope);
+ this.deleteTracker = new CallTracker(this.registry, "Mutate", "Delete", scope);
+ this.incrementTracker = new CallTracker(this.registry, "Mutate", "Increment", scope);
+ this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
+ this.multiTracker = new CallTracker(this.registry, "Multi", scope);
+ this.reporter = new JmxReporter(this.registry);
+ this.reporter.start();
+ }
+
+ public void shutdown() {
+ this.reporter.shutdown();
+ this.registry.shutdown();
+ }
+
+ /** Produce an instance of {@link CallStats} for clients to attach to RPCs. */
+ public static CallStats newCallStats() {
+ // TODO: instance pool to reduce GC?
+ return new CallStats();
+ }
+
+ /** Increment the number of meta cache hits. */
+ public void incrMetaCacheHit() {
+ metaCacheHits.inc();
+ }
+
+ /** Increment the number of meta cache misses. */
+ public void incrMetaCacheMiss() {
+ metaCacheMisses.inc();
+ }
+
+ /**
+ * Get a metric for {@code key} from {@code map}, or create it with {@code factory}.
+ */
+ private <T> T getMetric(String key, ConcurrentMap<String, T> map, NewMetric<T> factory) {
+ T t = map.get(key);
+ if (t == null) {
+ t = factory.newMetric(this.getClass(), key, scope);
+ map.putIfAbsent(key, t);
+ }
+ return t;
+ }
+
+ /** Update call stats for non-critical-path methods */
+ private void updateRpcGeneric(MethodDescriptor method, CallStats stats) {
+ final String methodName = method.getService().getName() + "_" + method.getName();
+ getMetric(DRTN_BASE + methodName, rpcTimers, timerFactory)
+ .update(stats.getCallTimeMs(), TimeUnit.MILLISECONDS);
+ getMetric(REQ_BASE + methodName, rpcHistograms, histogramFactory)
+ .update(stats.getRequestSizeBytes());
+ getMetric(RESP_BASE + methodName, rpcHistograms, histogramFactory)
+ .update(stats.getResponseSizeBytes());
+ }
+
+ /** Report RPC context to metrics system. */
+ public void updateRpc(MethodDescriptor method, Message param, CallStats stats) {
+ // this implementation is tied directly to protobuf implementation details. would be better
+ // if we could dispatch based on something static, ie, request Message type.
+ if (method.getService() == ClientService.getDescriptor()) {
+ switch(method.getIndex()) {
+ case 0:
+ assert "Get".equals(method.getName());
+ getTracker.updateRpc(stats);
+ return;
+ case 1:
+ assert "Mutate".equals(method.getName());
+ final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
+ switch(mutationType) {
+ case APPEND:
+ appendTracker.updateRpc(stats);
+ return;
+ case DELETE:
+ deleteTracker.updateRpc(stats);
+ return;
+ case INCREMENT:
+ incrementTracker.updateRpc(stats);
+ return;
+ case PUT:
+ putTracker.updateRpc(stats);
+ return;
+ default:
+ throw new RuntimeException("Unrecognized mutation type " + mutationType);
+ }
+ case 2:
+ assert "Scan".equals(method.getName());
+ scanTracker.updateRpc(stats);
+ return;
+ case 3:
+ assert "BulkLoadHFile".equals(method.getName());
+ // use generic implementation
+ break;
+ case 4:
+ assert "ExecService".equals(method.getName());
+ // use generic implementation
+ break;
+ case 5:
+ assert "ExecRegionServerService".equals(method.getName());
+ // use generic implementation
+ break;
+ case 6:
+ assert "Multi".equals(method.getName());
+ multiTracker.updateRpc(stats);
+ return;
+ default:
+ throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
+ }
+ }
+ // Fallback to dynamic registry lookup for DDL methods.
+ updateRpcGeneric(method, stats);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 9be370d..6f5e78a 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.codec.KeyValueCodec;
import org.apache.hadoop.hbase.security.User;
@@ -55,6 +56,7 @@ public abstract class AbstractRpcClient implements RpcClient {
protected final Configuration conf;
protected String clusterId;
protected final SocketAddress localAddr;
+ protected final MetricsConnection metrics;
protected UserProvider userProvider;
protected final IPCUtil ipcUtil;
@@ -79,8 +81,10 @@ public abstract class AbstractRpcClient implements RpcClient {
* @param conf configuration
* @param clusterId the cluster id
* @param localAddr client socket bind address.
+ * @param metrics the connection metrics
*/
- public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
+ public AbstractRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
+ MetricsConnection metrics) {
this.userProvider = UserProvider.instantiate(conf);
this.localAddr = localAddr;
this.tcpKeepAlive = conf.getBoolean("hbase.ipc.client.tcpkeepalive", true);
@@ -100,6 +104,7 @@ public abstract class AbstractRpcClient implements RpcClient {
this.connectTO = conf.getInt(SOCKET_TIMEOUT_CONNECT, DEFAULT_SOCKET_TIMEOUT_CONNECT);
this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
+ this.metrics = metrics;
// login the server principal (if using secure Hadoop)
if (LOG.isDebugEnabled()) {
@@ -205,19 +210,20 @@ public abstract class AbstractRpcClient implements RpcClient {
pcrc = new PayloadCarryingRpcController();
}
- long startTime = 0;
- if (LOG.isTraceEnabled()) {
- startTime = EnvironmentEdgeManager.currentTime();
- }
Pair<Message, CellScanner> val;
try {
- val = call(pcrc, md, param, returnType, ticket, isa);
+ final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
+ cs.setStartTime(EnvironmentEdgeManager.currentTime());
+ val = call(pcrc, md, param, returnType, ticket, isa, cs);
// Shove the results into controller so can be carried across the proxy/pb service void.
pcrc.setCellScanner(val.getSecond());
+ cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
+ if (metrics != null) {
+ metrics.updateRpc(md, param, cs);
+ }
if (LOG.isTraceEnabled()) {
- long callTime = EnvironmentEdgeManager.currentTime() - startTime;
- LOG.trace("Call: " + md.getName() + ", callTime: " + callTime + "ms");
+ LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
}
return val.getFirst();
} catch (Throwable e) {
@@ -242,7 +248,8 @@ public abstract class AbstractRpcClient implements RpcClient {
*/
protected abstract Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
- InetSocketAddress isa) throws IOException, InterruptedException;
+ InetSocketAddress isa, MetricsConnection.CallStats callStats)
+ throws IOException, InterruptedException;
@Override
public BlockingRpcChannel createBlockingRpcChannel(final ServerName sn, final User ticket,
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
index 431c669..a5da0dc 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncCall.java
@@ -25,6 +25,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.ExceptionUtil;
@@ -49,6 +50,7 @@ public class AsyncCall extends DefaultPromise<Message> {
final Message responseDefaultType;
final long startTime;
final long rpcTimeout;
+ final MetricsConnection.CallStats callStats;
/**
* Constructor
@@ -61,7 +63,8 @@ public class AsyncCall extends DefaultPromise<Message> {
* @param responseDefaultType the default response type
*/
public AsyncCall(EventLoop eventLoop, int connectId, Descriptors.MethodDescriptor md, Message
- param, PayloadCarryingRpcController controller, Message responseDefaultType) {
+ param, PayloadCarryingRpcController controller, Message responseDefaultType,
+ MetricsConnection.CallStats callStats) {
super(eventLoop);
this.id = connectId;
@@ -73,6 +76,7 @@ public class AsyncCall extends DefaultPromise<Message> {
this.startTime = EnvironmentEdgeManager.currentTime();
this.rpcTimeout = controller.hasCallTimeout() ? controller.getCallTimeout() : 0;
+ this.callStats = callStats;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
index 43d75f9..44e8322 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcChannel.java
@@ -49,6 +49,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AuthenticationProtos;
@@ -310,10 +311,10 @@ public class AsyncRpcChannel {
*/
public Promise<Message> callMethod(final Descriptors.MethodDescriptor method,
final PayloadCarryingRpcController controller, final Message request,
- final Message responsePrototype) {
+ final Message responsePrototype, MetricsConnection.CallStats callStats) {
final AsyncCall call =
new AsyncCall(channel.eventLoop(), client.callIdCnt.getAndIncrement(), method, request,
- controller, responsePrototype);
+ controller, responsePrototype, callStats);
controller.notifyOnCancel(new RpcCallback<Object>() {
@Override
public void run(Object parameter) {
@@ -433,7 +434,7 @@ public class AsyncRpcChannel {
ByteBuf b = channel.alloc().directBuffer(4 + totalSize);
try(ByteBufOutputStream out = new ByteBufOutputStream(b)) {
- IPCUtil.write(out, rh, call.param, cellBlock);
+ call.callStats.setRequestSizeBytes(IPCUtil.write(out, rh, call.param, cellBlock));
}
channel.writeAndFlush(b).addListener(new CallWriteListener(this, call.id));
@@ -579,8 +580,6 @@ public class AsyncRpcChannel {
/**
* Clean up calls.
- *
- * @param cleanAll true if all calls should be cleaned, false for only the timed out calls
*/
private void cleanupCalls() {
List<AsyncCall> toCleanup = new ArrayList<AsyncCall>();
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
index e1662f3..60e9add 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncRpcClient.java
@@ -52,7 +52,9 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.JVM;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.PoolMap;
@@ -146,12 +148,13 @@ public class AsyncRpcClient extends AbstractRpcClient {
* @param configuration to HBase
* @param clusterId for the cluster
* @param localAddress local address to connect to
+ * @param metrics the connection metrics
* @param channelInitializer for custom channel handlers
*/
- @VisibleForTesting
- AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
+ protected AsyncRpcClient(Configuration configuration, String clusterId,
+ SocketAddress localAddress, MetricsConnection metrics,
ChannelInitializer<SocketChannel> channelInitializer) {
- super(configuration, clusterId, localAddress);
+ super(configuration, clusterId, localAddress, metrics);
if (LOG.isDebugEnabled()) {
LOG.debug("Starting async Hbase RPC client");
@@ -191,15 +194,28 @@ public class AsyncRpcClient extends AbstractRpcClient {
}
}
+ /** Used in test only. */
+ AsyncRpcClient(Configuration configuration) {
+ this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null);
+ }
+
+ /** Used in test only. */
+ AsyncRpcClient(Configuration configuration,
+ ChannelInitializer<SocketChannel> channelInitializer) {
+ this(configuration, HConstants.CLUSTER_ID_DEFAULT, null, null, channelInitializer);
+ }
+
/**
* Constructor
*
* @param configuration to HBase
* @param clusterId for the cluster
* @param localAddress local address to connect to
+ * @param metrics the connection metrics
*/
- public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress) {
- this(configuration, clusterId, localAddress, null);
+ public AsyncRpcClient(Configuration configuration, String clusterId, SocketAddress localAddress,
+ MetricsConnection metrics) {
+ this(configuration, clusterId, localAddress, metrics, null);
}
/**
@@ -219,13 +235,14 @@ public class AsyncRpcClient extends AbstractRpcClient {
@Override
protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc,
Descriptors.MethodDescriptor md, Message param, Message returnType, User ticket,
- InetSocketAddress addr) throws IOException, InterruptedException {
+ InetSocketAddress addr, MetricsConnection.CallStats callStats)
+ throws IOException, InterruptedException {
if (pcrc == null) {
pcrc = new PayloadCarryingRpcController();
}
final AsyncRpcChannel connection = createRpcChannel(md.getService().getName(), addr, ticket);
- Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType);
+ Promise<Message> promise = connection.callMethod(md, pcrc, param, returnType, callStats);
long timeout = pcrc.hasCallTimeout() ? pcrc.getCallTimeout() : 0;
try {
Message response = timeout > 0 ? promise.get(timeout, TimeUnit.MILLISECONDS) : promise.get();
@@ -244,40 +261,49 @@ public class AsyncRpcClient extends AbstractRpcClient {
/**
* Call method async
*/
- private void callMethod(Descriptors.MethodDescriptor md, final PayloadCarryingRpcController pcrc,
- Message param, Message returnType, User ticket, InetSocketAddress addr,
- final RpcCallback<Message> done) {
+ private void callMethod(final Descriptors.MethodDescriptor md,
+ final PayloadCarryingRpcController pcrc, final Message param, Message returnType, User ticket,
+ InetSocketAddress addr, final RpcCallback<Message> done) {
final AsyncRpcChannel connection;
try {
connection = createRpcChannel(md.getService().getName(), addr, ticket);
-
- connection.callMethod(md, pcrc, param, returnType).addListener(
+ final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
+ GenericFutureListener<Future<Message>> listener =
new GenericFutureListener<Future<Message>>() {
- @Override
- public void operationComplete(Future<Message> future) throws Exception {
- if(!future.isSuccess()){
- Throwable cause = future.cause();
- if (cause instanceof IOException) {
- pcrc.setFailed((IOException) cause);
- }else{
- pcrc.setFailed(new IOException(cause));
- }
- }else{
- try {
- done.run(future.get());
- }catch (ExecutionException e){
- Throwable cause = e.getCause();
- if (cause instanceof IOException) {
- pcrc.setFailed((IOException) cause);
- }else{
- pcrc.setFailed(new IOException(cause));
+ @Override
+ public void operationComplete(Future<Message> future) throws Exception {
+ cs.setCallTimeMs(EnvironmentEdgeManager.currentTime() - cs.getStartTime());
+ if (metrics != null) {
+ metrics.updateRpc(md, param, cs);
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Call: " + md.getName() + ", callTime: " + cs.getCallTimeMs() + "ms");
+ }
+ if (!future.isSuccess()) {
+ Throwable cause = future.cause();
+ if (cause instanceof IOException) {
+ pcrc.setFailed((IOException) cause);
+ } else {
+ pcrc.setFailed(new IOException(cause));
+ }
+ } else {
+ try {
+ done.run(future.get());
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ if (cause instanceof IOException) {
+ pcrc.setFailed((IOException) cause);
+ } else {
+ pcrc.setFailed(new IOException(cause));
+ }
+ } catch (InterruptedException e) {
+ pcrc.setFailed(new IOException(e));
+ }
}
- }catch (InterruptedException e){
- pcrc.setFailed(new IOException(e));
}
- }
- }
- });
+ };
+ cs.setStartTime(EnvironmentEdgeManager.currentTime());
+ connection.callMethod(md, pcrc, param, returnType, cs).addListener(listener);
} catch (StoppedRpcClientException|FailedServerException e) {
pcrc.setFailed(e);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
index f7aa8a9..8f6c85b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AsyncServerResponseHandler.java
@@ -24,8 +24,6 @@ import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.IOException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -39,8 +37,6 @@ import com.google.protobuf.Message;
*/
@InterfaceAudience.Private
public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
- private static final Log LOG = LogFactory.getLog(AsyncServerResponseHandler.class.getName());
-
private final AsyncRpcChannel channel;
/**
@@ -102,6 +98,7 @@ public class AsyncServerResponseHandler extends ChannelInboundHandlerAdapter {
cellBlockScanner = channel.client.createCellScanner(cellBlock);
}
call.setSuccess(value, cellBlockScanner);
+ call.callStats.setResponseSizeBytes(totalSize);
}
} catch (IOException e) {
// Treat this as a fatal condition and close this connection
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
index df32730..5f90837 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/Call.java
@@ -21,6 +21,7 @@ import com.google.protobuf.Descriptors;
import com.google.protobuf.Message;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -41,16 +42,18 @@ public class Call {
Message responseDefaultType;
IOException error; // exception, null if value
volatile boolean done; // true when call is done
- long startTime;
final Descriptors.MethodDescriptor md;
final int timeout; // timeout in millisecond for this call; 0 means infinite.
+ final MetricsConnection.CallStats callStats;
protected Call(int id, final Descriptors.MethodDescriptor md, Message param,
- final CellScanner cells, final Message responseDefaultType, int timeout) {
+ final CellScanner cells, final Message responseDefaultType, int timeout,
+ MetricsConnection.CallStats callStats) {
this.param = param;
this.md = md;
this.cells = cells;
- this.startTime = EnvironmentEdgeManager.currentTime();
+ this.callStats = callStats;
+ this.callStats.setStartTime(EnvironmentEdgeManager.currentTime());
this.responseDefaultType = responseDefaultType;
this.id = id;
this.timeout = timeout;
@@ -122,6 +125,6 @@ public class Call {
}
public long getStartTime() {
- return this.startTime;
+ return this.callStats.getStartTime();
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
index 10ddc56..822daca 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientFactory.java
@@ -17,8 +17,10 @@
*/
package org.apache.hadoop.hbase.ipc;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import java.net.SocketAddress;
@@ -37,15 +39,23 @@ public final class RpcClientFactory {
private RpcClientFactory() {
}
+ /** Helper method for tests only. Creates an {@code RpcClient} without metrics. */
+ @VisibleForTesting
+ public static RpcClient createClient(Configuration conf, String clusterId) {
+ return createClient(conf, clusterId, null);
+ }
+
/**
* Creates a new RpcClient by the class defined in the configuration or falls back to
* RpcClientImpl
* @param conf configuration
* @param clusterId the cluster id
+ * @param metrics the connection metrics
* @return newly created RpcClient
*/
- public static RpcClient createClient(Configuration conf, String clusterId) {
- return createClient(conf, clusterId, null);
+ public static RpcClient createClient(Configuration conf, String clusterId,
+ MetricsConnection metrics) {
+ return createClient(conf, clusterId, null, metrics);
}
/**
@@ -54,16 +64,18 @@ public final class RpcClientFactory {
* @param conf configuration
* @param clusterId the cluster id
* @param localAddr client socket bind address.
+ * @param metrics the connection metrics
* @return newly created RpcClient
*/
public static RpcClient createClient(Configuration conf, String clusterId,
- SocketAddress localAddr) {
+ SocketAddress localAddr, MetricsConnection metrics) {
String rpcClientClass =
conf.get(CUSTOM_RPC_CLIENT_IMPL_CONF_KEY, AsyncRpcClient.class.getName());
return ReflectionUtils.instantiateWithCustomCtor(
rpcClientClass,
- new Class[] { Configuration.class, String.class, SocketAddress.class },
- new Object[] { conf, clusterId, localAddr }
+ new Class[] { Configuration.class, String.class, SocketAddress.class,
+ MetricsConnection.class },
+ new Object[] { conf, clusterId, localAddr, metrics }
);
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
index cb18952..3fb7061 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/RpcClientImpl.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.ipc;
+import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.Descriptors.MethodDescriptor;
import com.google.protobuf.Message;
import com.google.protobuf.Message.Builder;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.exceptions.ConnectionClosingException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
@@ -911,7 +913,8 @@ public class RpcClientImpl extends AbstractRpcClient {
checkIsOpen(); // Now we're checking that it didn't became idle in between.
try {
- IPCUtil.write(this.out, header, call.param, cellBlock);
+ call.callStats.setRequestSizeBytes(
+ IPCUtil.write(this.out, header, call.param, cellBlock));
} catch (IOException e) {
// We set the value inside the synchronized block, this way the next in line
// won't even try to write. Otherwise we might miss a call in the calls map?
@@ -964,12 +967,20 @@ public class RpcClientImpl extends AbstractRpcClient {
int readSoFar = IPCUtil.getTotalSizeWhenWrittenDelimited(responseHeader);
int whatIsLeftToRead = totalSize - readSoFar;
IOUtils.skipFully(in, whatIsLeftToRead);
+ if (call != null) {
+ call.callStats.setResponseSizeBytes(totalSize);
+ call.callStats.setCallTimeMs(
+ EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
+ }
return;
}
if (responseHeader.hasException()) {
ExceptionResponse exceptionResponse = responseHeader.getException();
RemoteException re = createRemoteException(exceptionResponse);
call.setException(re);
+ call.callStats.setResponseSizeBytes(totalSize);
+ call.callStats.setCallTimeMs(
+ EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
if (isFatalConnectionException(exceptionResponse)) {
markClosed(re);
}
@@ -988,6 +999,9 @@ public class RpcClientImpl extends AbstractRpcClient {
cellBlockScanner = ipcUtil.createCellScanner(this.codec, this.compressor, cellBlock);
}
call.setResponse(value, cellBlockScanner);
+ call.callStats.setResponseSizeBytes(totalSize);
+ call.callStats.setCallTimeMs(
+ EnvironmentEdgeManager.currentTime() - call.callStats.getStartTime());
}
} catch (IOException e) {
if (expectedCall) call.setException(e);
@@ -1075,13 +1089,15 @@ public class RpcClientImpl extends AbstractRpcClient {
}
/**
- * Construct an IPC cluster client whose values are of the {@link Message} class.
+ * Used in test only. Construct an IPC cluster client whose values are of the
+ * {@link Message} class.
* @param conf configuration
* @param clusterId the cluster id
* @param factory socket factory
*/
+ @VisibleForTesting
RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory) {
- this(conf, clusterId, factory, null);
+ this(conf, clusterId, factory, null, null);
}
/**
@@ -1090,10 +1106,11 @@ public class RpcClientImpl extends AbstractRpcClient {
* @param clusterId the cluster id
* @param factory socket factory
* @param localAddr client socket bind address
+ * @param metrics the connection metrics
*/
RpcClientImpl(Configuration conf, String clusterId, SocketFactory factory,
- SocketAddress localAddr) {
- super(conf, clusterId, localAddr);
+ SocketAddress localAddr, MetricsConnection metrics) {
+ super(conf, clusterId, localAddr, metrics);
this.socketFactory = factory;
this.connections = new PoolMap<ConnectionId, Connection>(getPoolType(conf), getPoolSize(conf));
@@ -1101,25 +1118,27 @@ public class RpcClientImpl extends AbstractRpcClient {
}
/**
- * Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory
- * @param conf configuration
- * @param clusterId the cluster id
+ * Used in test only. Construct an IPC client for the cluster {@code clusterId} with
+ * the default SocketFactory
*/
- public RpcClientImpl(Configuration conf, String clusterId) {
- this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null);
+ @VisibleForTesting
+ RpcClientImpl(Configuration conf, String clusterId) {
+ this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), null, null);
}
/**
- * Construct an IPC client for the cluster <code>clusterId</code> with the default SocketFactory
+ * Construct an IPC client for the cluster {@code clusterId} with the default SocketFactory
*
* This method is called with reflection by the RpcClientFactory to create an instance
*
* @param conf configuration
* @param clusterId the cluster id
* @param localAddr client socket bind address.
+ * @param metrics the connection metrics
*/
- public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr) {
- this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr);
+ public RpcClientImpl(Configuration conf, String clusterId, SocketAddress localAddr,
+ MetricsConnection metrics) {
+ this(conf, clusterId, NetUtils.getDefaultSocketFactory(conf), localAddr, metrics);
}
/** Stop all threads related to this client. No further calls may be made
@@ -1182,7 +1201,8 @@ public class RpcClientImpl extends AbstractRpcClient {
*/
@Override
protected Pair<Message, CellScanner> call(PayloadCarryingRpcController pcrc, MethodDescriptor md,
- Message param, Message returnType, User ticket, InetSocketAddress addr)
+ Message param, Message returnType, User ticket, InetSocketAddress addr,
+ MetricsConnection.CallStats callStats)
throws IOException, InterruptedException {
if (pcrc == null) {
pcrc = new PayloadCarryingRpcController();
@@ -1190,7 +1210,7 @@ public class RpcClientImpl extends AbstractRpcClient {
CellScanner cells = pcrc.cellScanner();
final Call call = new Call(this.callIdCnt.getAndIncrement(), md, param, cells, returnType,
- pcrc.getCallTimeout());
+ pcrc.getCallTimeout(), MetricsConnection.newCallStats());
final Connection connection = getConnection(ticket, call, addr);
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
new file mode 100644
index 0000000..88a653e
--- /dev/null
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestMetricsConnection.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import com.google.protobuf.ByteString;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ClientService;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.GetRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MultiRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutateRequest;
+import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.MutationProto.MutationType;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier;
+import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.MetricsTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+
+@Category({ClientTests.class, MetricsTests.class, SmallTests.class})
+public class TestMetricsConnection {
+
+ private static MetricsConnection METRICS;
+
+ @BeforeClass
+ public static void beforeClass() {
+ ConnectionImplementation mocked = Mockito.mock(ConnectionImplementation.class);
+ Mockito.when(mocked.toString()).thenReturn("mocked-connection");
+ METRICS = new MetricsConnection(Mockito.mock(ConnectionImplementation.class));
+ }
+
+ @AfterClass
+ public static void afterClass() {
+ METRICS.shutdown();
+ }
+
+ @Test
+ public void testStaticMetrics() throws IOException {
+ final byte[] foo = Bytes.toBytes("foo");
+ final RegionSpecifier region = RegionSpecifier.newBuilder()
+ .setValue(ByteString.EMPTY)
+ .setType(RegionSpecifierType.REGION_NAME)
+ .build();
+ final int loop = 5;
+
+ for (int i = 0; i < loop; i++) {
+ METRICS.updateRpc(
+ ClientService.getDescriptor().findMethodByName("Get"),
+ GetRequest.getDefaultInstance(),
+ MetricsConnection.newCallStats());
+ METRICS.updateRpc(
+ ClientService.getDescriptor().findMethodByName("Scan"),
+ ScanRequest.getDefaultInstance(),
+ MetricsConnection.newCallStats());
+ METRICS.updateRpc(
+ ClientService.getDescriptor().findMethodByName("Multi"),
+ MultiRequest.getDefaultInstance(),
+ MetricsConnection.newCallStats());
+ METRICS.updateRpc(
+ ClientService.getDescriptor().findMethodByName("Mutate"),
+ MutateRequest.newBuilder()
+ .setMutation(ProtobufUtil.toMutation(MutationType.APPEND, new Append(foo)))
+ .setRegion(region)
+ .build(),
+ MetricsConnection.newCallStats());
+ METRICS.updateRpc(
+ ClientService.getDescriptor().findMethodByName("Mutate"),
+ MutateRequest.newBuilder()
+ .setMutation(ProtobufUtil.toMutation(MutationType.DELETE, new Delete(foo)))
+ .setRegion(region)
+ .build(),
+ MetricsConnection.newCallStats());
+ METRICS.updateRpc(
+ ClientService.getDescriptor().findMethodByName("Mutate"),
+ MutateRequest.newBuilder()
+ .setMutation(ProtobufUtil.toMutation(MutationType.INCREMENT, new Increment(foo)))
+ .setRegion(region)
+ .build(),
+ MetricsConnection.newCallStats());
+ METRICS.updateRpc(
+ ClientService.getDescriptor().findMethodByName("Mutate"),
+ MutateRequest.newBuilder()
+ .setMutation(ProtobufUtil.toMutation(MutationType.PUT, new Put(foo)))
+ .setRegion(region)
+ .build(),
+ MetricsConnection.newCallStats());
+ }
+ for (MetricsConnection.CallTracker t : new MetricsConnection.CallTracker[] {
+ METRICS.getTracker, METRICS.scanTracker, METRICS.multiTracker, METRICS.appendTracker,
+ METRICS.deleteTracker, METRICS.incrementTracker, METRICS.putTracker
+ }) {
+ Assert.assertEquals("Failed to invoke callTimer on " + t, loop, t.callTimer.count());
+ Assert.assertEquals("Failed to invoke reqHist on " + t, loop, t.reqHist.count());
+ Assert.assertEquals("Failed to invoke respHist on " + t, loop, t.respHist.count());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index 7653fa1..b799837 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -851,7 +851,7 @@ public class HRegionServer extends HasThread implements
// Setup RPC client for master communication
rpcClient = RpcClientFactory.createClient(conf, clusterId, new InetSocketAddress(
- rpcServices.isa.getAddress(), 0));
+ rpcServices.isa.getAddress(), 0), clusterConnection.getConnectionMetrics());
boolean onlyMetaRefresh = false;
int storefileRefreshPeriod = conf.getInt(
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
index b2cb772..91f494a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServer.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.classification.InterfaceStability;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
@@ -48,7 +49,7 @@ public class MetricsRegionServer {
this.serverSource = serverSource;
}
- // for unit-test usage
+ @VisibleForTesting
public MetricsRegionServerSource getMetricsSource() {
return serverSource;
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
index 418a0ac..8edfd9e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestClientTimeouts.java
@@ -134,9 +134,10 @@ public class TestClientTimeouts {
/**
* Rpc Channel implementation with RandomTimeoutBlockingRpcChannel
*/
- public static class RandomTimeoutRpcClient extends RpcClientImpl{
- public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr) {
- super(conf, clusterId, localAddr);
+ public static class RandomTimeoutRpcClient extends RpcClientImpl {
+ public RandomTimeoutRpcClient(Configuration conf, String clusterId, SocketAddress localAddr,
+ MetricsConnection metrics) {
+ super(conf, clusterId, localAddr, metrics);
}
// Return my own instance, one that does random timeouts
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
index 32eb9f6..d427419 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/AbstractTestIPC.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@@ -163,7 +164,8 @@ public abstract class AbstractTestIPC {
final String message = "hello";
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage(message).build();
Pair<Message, CellScanner> r =
- client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
+ client.call(null, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
+ new MetricsConnection.CallStats());
assertTrue(r.getSecond() == null);
// Silly assertion that the message is in the returned pb.
assertTrue(r.getFirst().toString().contains(message));
@@ -205,7 +207,8 @@ public abstract class AbstractTestIPC {
PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
Pair<Message, CellScanner> r =
- client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address);
+ client.call(pcrc, md, param, md.getOutputType().toProto(), User.getCurrent(), address,
+ new MetricsConnection.CallStats());
int index = 0;
while (r.getSecond().advance()) {
assertTrue(CELL.equals(r.getSecond().current()));
@@ -231,7 +234,8 @@ public abstract class AbstractTestIPC {
InetSocketAddress address = rpcServer.getListenerAddress();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
- client.call(null, md, param, null, User.getCurrent(), address);
+ client.call(null, md, param, null, User.getCurrent(), address,
+ new MetricsConnection.CallStats());
fail("Expected an exception to have been thrown!");
} catch (Exception e) {
LOG.info("Caught expected exception: " + e.toString());
@@ -255,10 +259,10 @@ public abstract class AbstractTestIPC {
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
for (int i = 0; i < 10; i++) {
- client.call(
- new PayloadCarryingRpcController(
- CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param, md
- .getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress());
+ client.call(new PayloadCarryingRpcController(
+ CellUtil.createCellScanner(ImmutableList.<Cell> of(CELL))), md, param,
+ md.getOutputType().toProto(), User.getCurrent(), rpcServer.getListenerAddress(),
+ new MetricsConnection.CallStats());
}
verify(scheduler, times(10)).dispatch((CallRunner) anyObject());
} finally {
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
index 18e3798..d761ae9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestAsyncIPC.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.codec.Codec;
@@ -116,7 +117,7 @@ public class TestAsyncIPC extends AbstractTestIPC {
@Override
protected AsyncRpcClient createRpcClientNoCodec(Configuration conf) {
setConf(conf);
- return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null) {
+ return new AsyncRpcClient(conf) {
@Override
Codec getCodec() {
@@ -129,15 +130,13 @@ public class TestAsyncIPC extends AbstractTestIPC {
@Override
protected AsyncRpcClient createRpcClient(Configuration conf) {
setConf(conf);
- return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
+ return new AsyncRpcClient(conf);
}
@Override
protected AsyncRpcClient createRpcClientRTEDuringConnectionSetup(Configuration conf) {
setConf(conf);
- return new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null,
- new ChannelInitializer<SocketChannel>() {
-
+ return new AsyncRpcClient(conf, new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addFirst(new ChannelOutboundHandlerAdapter() {
@@ -248,7 +247,7 @@ public class TestAsyncIPC extends AbstractTestIPC {
TestRpcServer rpcServer = new TestRpcServer();
MethodDescriptor md = SERVICE.getDescriptorForType().findMethodByName("echo");
EchoRequestProto param = EchoRequestProto.newBuilder().setMessage("hello").build();
- AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
+ AsyncRpcClient client = new AsyncRpcClient(conf);
KeyValue kv = BIG_CELL;
Put p = new Put(CellUtil.cloneRow(kv));
for (int i = 0; i < cellcount; i++) {
@@ -282,7 +281,8 @@ public class TestAsyncIPC extends AbstractTestIPC {
PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
// Pair<Message, CellScanner> response =
- client.call(pcrc, md, builder.build(), param, user, address);
+ client.call(pcrc, md, builder.build(), param, user, address,
+ new MetricsConnection.CallStats());
/*
* int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
* count);
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java
index 60dbd1b..52c77e7 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestGlobalEventLoopGroup.java
@@ -37,15 +37,15 @@ public class TestGlobalEventLoopGroup {
public void test() {
Configuration conf = HBaseConfiguration.create();
conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, true);
- AsyncRpcClient client = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
+ AsyncRpcClient client = new AsyncRpcClient(conf);
assertNotNull(AsyncRpcClient.GLOBAL_EVENT_LOOP_GROUP);
- AsyncRpcClient client1 = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
+ AsyncRpcClient client1 = new AsyncRpcClient(conf);
assertSame(client.bootstrap.group(), client1.bootstrap.group());
client1.close();
assertFalse(client.bootstrap.group().isShuttingDown());
conf.setBoolean(AsyncRpcClient.USE_GLOBAL_EVENT_LOOP_GROUP, false);
- AsyncRpcClient client2 = new AsyncRpcClient(conf, HConstants.CLUSTER_ID_DEFAULT, null);
+ AsyncRpcClient client2 = new AsyncRpcClient(conf);
assertNotSame(client.bootstrap.group(), client2.bootstrap.group());
client2.close();
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
index 67e4e4f..d1b8202 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestIPC.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.RowMutations;
import org.apache.hadoop.hbase.codec.Codec;
@@ -145,7 +146,8 @@ public class TestIPC extends AbstractTestIPC {
PayloadCarryingRpcController pcrc =
new PayloadCarryingRpcController(CellUtil.createCellScanner(cells));
// Pair<Message, CellScanner> response =
- client.call(pcrc, md, builder.build(), param, user, address);
+ client.call(pcrc, md, builder.build(), param, user, address,
+ new MetricsConnection.CallStats());
/*
* int count = 0; while (p.getSecond().advance()) { count++; } assertEquals(cells.size(),
* count);
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
index 2965055..596b8ab 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcClientLeaks.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.codec.Codec;
@@ -58,8 +59,9 @@ public class TestRpcClientLeaks {
super(conf, clusterId);
}
- public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address) {
- super(conf, clusterId, address);
+ public MyRpcClientImpl(Configuration conf, String clusterId, SocketAddress address,
+ MetricsConnection metrics) {
+ super(conf, clusterId, address, metrics);
}
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/7e30436e/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
index 8a6b092..a4e55d9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestRpcHandlerException.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.MetricsConnection;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoRequestProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EchoResponseProto;
import org.apache.hadoop.hbase.ipc.protobuf.generated.TestProtos.EmptyRequestProto;
@@ -181,7 +182,7 @@ public class TestRpcHandlerException {
new PayloadCarryingRpcController(CellUtil.createCellScanner(ImmutableList.of(CELL)));
client.call(controller, md, param, md.getOutputType().toProto(), User.getCurrent(),
- rpcServer.getListenerAddress());
+ rpcServer.getListenerAddress(), new MetricsConnection.CallStats());
} catch (Throwable e) {
assert(abortable.isAborted() == true);
} finally {