You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2019/01/24 02:06:09 UTC
[hbase] branch branch-2 updated: HBASE-21720 : metric to measure
how actions are distributed to servers within a MultiAction
This is an automated email from the ASF dual-hosted git repository.
sershe pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 8a7dd2d HBASE-21720 : metric to measure how actions are distributed to servers within a MultiAction
8a7dd2d is described below
commit 8a7dd2de2480b7ea7aa71cfa0b0f2edd241d6c2f
Author: Tommy Li <to...@microsoft.com>
AuthorDate: Wed Jan 23 17:52:58 2019 -0800
HBASE-21720 : metric to measure how actions are distributed to servers within a MultiAction
Signed-off-by: Sergey Shelukhin <se...@apache.org>
---
.../hadoop/hbase/client/MetricsConnection.java | 113 ++++++++++++---------
.../apache/hadoop/hbase/ipc/AbstractRpcClient.java | 13 +++
.../client/TestMultiActionMetricsFromClient.java | 88 ++++++++++++++++
3 files changed, 164 insertions(+), 50 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index e61ba24..a53188f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -74,6 +74,7 @@ public class MetricsConnection implements StatisticTrackable {
private long startTime = 0;
private long callTimeMs = 0;
private int concurrentCallsPerServer = 0;
+ private int numActionsPerServer = 0;
public long getRequestSizeBytes() {
return requestSizeBytes;
@@ -114,6 +115,14 @@ public class MetricsConnection implements StatisticTrackable {
public void setConcurrentCallsPerServer(int callsPerServer) {
this.concurrentCallsPerServer = callsPerServer;
}
+
+ public int getNumActionsPerServer() {
+ return numActionsPerServer;
+ }
+
+ public void setNumActionsPerServer(int numActionsPerServer) {
+ this.numActionsPerServer = numActionsPerServer;
+ }
}
@VisibleForTesting
@@ -281,6 +290,7 @@ public class MetricsConnection implements StatisticTrackable {
@VisibleForTesting protected final Counter hedgedReadOps;
@VisibleForTesting protected final Counter hedgedReadWin;
@VisibleForTesting protected final Histogram concurrentCallsPerServerHist;
+ @VisibleForTesting protected final Histogram numActionsPerServerHist;
// dynamic metrics
@@ -337,8 +347,10 @@ public class MetricsConnection implements StatisticTrackable {
this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
this.multiTracker = new CallTracker(this.registry, "Multi", scope);
this.runnerStats = new RunnerStats(this.registry);
- this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class,
+ this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class,
"concurrentCallsPerServer", scope));
+ this.numActionsPerServerHist = registry.histogram(name(MetricsConnection.class,
+ "numActionsPerServer", scope));
this.reporter = JmxReporter.forRegistry(this.registry).build();
this.reporter.start();
@@ -442,59 +454,60 @@ public class MetricsConnection implements StatisticTrackable {
// if we could dispatch based on something static, ie, request Message type.
if (method.getService() == ClientService.getDescriptor()) {
switch(method.getIndex()) {
- case 0:
- assert "Get".equals(method.getName());
- getTracker.updateRpc(stats);
- return;
- case 1:
- assert "Mutate".equals(method.getName());
- final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
- switch(mutationType) {
- case APPEND:
- appendTracker.updateRpc(stats);
- return;
- case DELETE:
- deleteTracker.updateRpc(stats);
+ case 0:
+ assert "Get".equals(method.getName());
+ getTracker.updateRpc(stats);
return;
- case INCREMENT:
- incrementTracker.updateRpc(stats);
+ case 1:
+ assert "Mutate".equals(method.getName());
+ final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
+ switch(mutationType) {
+ case APPEND:
+ appendTracker.updateRpc(stats);
+ return;
+ case DELETE:
+ deleteTracker.updateRpc(stats);
+ return;
+ case INCREMENT:
+ incrementTracker.updateRpc(stats);
+ return;
+ case PUT:
+ putTracker.updateRpc(stats);
+ return;
+ default:
+ throw new RuntimeException("Unrecognized mutation type " + mutationType);
+ }
+ case 2:
+ assert "Scan".equals(method.getName());
+ scanTracker.updateRpc(stats);
return;
- case PUT:
- putTracker.updateRpc(stats);
+ case 3:
+ assert "BulkLoadHFile".equals(method.getName());
+ // use generic implementation
+ break;
+ case 4:
+ assert "PrepareBulkLoad".equals(method.getName());
+ // use generic implementation
+ break;
+ case 5:
+ assert "CleanupBulkLoad".equals(method.getName());
+ // use generic implementation
+ break;
+ case 6:
+ assert "ExecService".equals(method.getName());
+ // use generic implementation
+ break;
+ case 7:
+ assert "ExecRegionServerService".equals(method.getName());
+ // use generic implementation
+ break;
+ case 8:
+ assert "Multi".equals(method.getName());
+ numActionsPerServerHist.update(stats.getNumActionsPerServer());
+ multiTracker.updateRpc(stats);
return;
default:
- throw new RuntimeException("Unrecognized mutation type " + mutationType);
- }
- case 2:
- assert "Scan".equals(method.getName());
- scanTracker.updateRpc(stats);
- return;
- case 3:
- assert "BulkLoadHFile".equals(method.getName());
- // use generic implementation
- break;
- case 4:
- assert "PrepareBulkLoad".equals(method.getName());
- // use generic implementation
- break;
- case 5:
- assert "CleanupBulkLoad".equals(method.getName());
- // use generic implementation
- break;
- case 6:
- assert "ExecService".equals(method.getName());
- // use generic implementation
- break;
- case 7:
- assert "ExecRegionServerService".equals(method.getName());
- // use generic implementation
- break;
- case 8:
- assert "Multi".equals(method.getName());
- multiTracker.updateRpc(stats);
- return;
- default:
- throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
+ throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
}
}
// Fallback to dynamic registry lookup for DDL methods.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index c8904f4..8fc5b0c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -70,6 +70,8 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
/**
* Provides the basics for a RpcClient implementation like configuration and Logging.
* <p>
@@ -401,6 +403,17 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
final RpcCallback<Message> callback) {
final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
cs.setStartTime(EnvironmentEdgeManager.currentTime());
+
+ if (param instanceof ClientProtos.MultiRequest) {
+ ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
+ int numActions = 0;
+ for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
+ numActions += regionAction.getActionCount();
+ }
+
+ cs.setNumActionsPerServer(numActions);
+ }
+
final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiActionMetricsFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiActionMetricsFromClient.java
new file mode 100644
index 0000000..349f052
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiActionMetricsFromClient.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, ClientTests.class })
+public class TestMultiActionMetricsFromClient {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestMultiActionMetricsFromClient.class);
+
+ private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+ private static final TableName TABLE_NAME = TableName.valueOf("test_table");
+ private static final byte[] FAMILY = Bytes.toBytes("fam1");
+ private static final byte[] QUALIFIER = Bytes.toBytes("qual");
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniCluster(1);
+ TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
+ TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME.META_TABLE_NAME);
+ TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testMultiMetrics() throws Exception {
+ Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+ conf.set(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, "true");
+ ConnectionImplementation conn =
+ (ConnectionImplementation) ConnectionFactory.createConnection(conf);
+
+ try {
+ BufferedMutator mutator = conn.getBufferedMutator(TABLE_NAME);
+ byte[][] keys = {Bytes.toBytes("aaa"), Bytes.toBytes("mmm"), Bytes.toBytes("zzz")};
+ for (byte[] key : keys) {
+ Put p = new Put(key);
+ p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
+ mutator.mutate(p);
+ }
+
+ mutator.flush();
+ mutator.close();
+
+ MetricsConnection metrics = conn.getConnectionMetrics();
+ assertEquals(1, metrics.multiTracker.reqHist.getCount());
+ assertEquals(3, metrics.numActionsPerServerHist.getSnapshot().getMean(), 1e-15);
+ assertEquals(1, metrics.numActionsPerServerHist.getCount());
+ } finally {
+ conn.close();
+ }
+ }
+}