You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by se...@apache.org on 2019/01/24 02:06:10 UTC

[hbase] branch master updated: HBASE-21720 : metric to measure how actions are distributed to servers within a MultiAction

This is an automated email from the ASF dual-hosted git repository.

sershe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new d187af0  HBASE-21720 : metric to measure how actions are distributed to servers within a MultiAction
d187af0 is described below

commit d187af00caace4c86ec52ee79effff86f631fc94
Author: Tommy Li <to...@microsoft.com>
AuthorDate: Wed Jan 23 17:52:58 2019 -0800

    HBASE-21720 : metric to measure how actions are distributed to servers within a MultiAction
    
    Signed-off-by: Sergey Shelukhin <se...@apache.org>
---
 .../hadoop/hbase/client/MetricsConnection.java     | 113 ++++++++++++---------
 .../apache/hadoop/hbase/ipc/AbstractRpcClient.java |  13 +++
 .../client/TestMultiActionMetricsFromClient.java   |  88 ++++++++++++++++
 3 files changed, 164 insertions(+), 50 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
index e61ba24..a53188f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/MetricsConnection.java
@@ -74,6 +74,7 @@ public class MetricsConnection implements StatisticTrackable {
     private long startTime = 0;
     private long callTimeMs = 0;
     private int concurrentCallsPerServer = 0;
+    private int numActionsPerServer = 0;
 
     public long getRequestSizeBytes() {
       return requestSizeBytes;
@@ -114,6 +115,14 @@ public class MetricsConnection implements StatisticTrackable {
     public void setConcurrentCallsPerServer(int callsPerServer) {
       this.concurrentCallsPerServer = callsPerServer;
     }
+
+    public int getNumActionsPerServer() {
+      return numActionsPerServer;
+    }
+
+    public void setNumActionsPerServer(int numActionsPerServer) {
+      this.numActionsPerServer = numActionsPerServer;
+    }
   }
 
   @VisibleForTesting
@@ -281,6 +290,7 @@ public class MetricsConnection implements StatisticTrackable {
   @VisibleForTesting protected final Counter hedgedReadOps;
   @VisibleForTesting protected final Counter hedgedReadWin;
   @VisibleForTesting protected final Histogram concurrentCallsPerServerHist;
+  @VisibleForTesting protected final Histogram numActionsPerServerHist;
 
   // dynamic metrics
 
@@ -337,8 +347,10 @@ public class MetricsConnection implements StatisticTrackable {
     this.putTracker = new CallTracker(this.registry, "Mutate", "Put", scope);
     this.multiTracker = new CallTracker(this.registry, "Multi", scope);
     this.runnerStats = new RunnerStats(this.registry);
-    this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class, 
+    this.concurrentCallsPerServerHist = registry.histogram(name(MetricsConnection.class,
       "concurrentCallsPerServer", scope));
+    this.numActionsPerServerHist = registry.histogram(name(MetricsConnection.class,
+      "numActionsPerServer", scope));
 
     this.reporter = JmxReporter.forRegistry(this.registry).build();
     this.reporter.start();
@@ -442,59 +454,60 @@ public class MetricsConnection implements StatisticTrackable {
     // if we could dispatch based on something static, ie, request Message type.
     if (method.getService() == ClientService.getDescriptor()) {
       switch(method.getIndex()) {
-      case 0:
-        assert "Get".equals(method.getName());
-        getTracker.updateRpc(stats);
-        return;
-      case 1:
-        assert "Mutate".equals(method.getName());
-        final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
-        switch(mutationType) {
-        case APPEND:
-          appendTracker.updateRpc(stats);
-          return;
-        case DELETE:
-          deleteTracker.updateRpc(stats);
+        case 0:
+          assert "Get".equals(method.getName());
+          getTracker.updateRpc(stats);
           return;
-        case INCREMENT:
-          incrementTracker.updateRpc(stats);
+        case 1:
+          assert "Mutate".equals(method.getName());
+          final MutationType mutationType = ((MutateRequest) param).getMutation().getMutateType();
+          switch(mutationType) {
+            case APPEND:
+              appendTracker.updateRpc(stats);
+              return;
+            case DELETE:
+              deleteTracker.updateRpc(stats);
+              return;
+            case INCREMENT:
+              incrementTracker.updateRpc(stats);
+              return;
+            case PUT:
+              putTracker.updateRpc(stats);
+              return;
+            default:
+              throw new RuntimeException("Unrecognized mutation type " + mutationType);
+          }
+        case 2:
+          assert "Scan".equals(method.getName());
+          scanTracker.updateRpc(stats);
           return;
-        case PUT:
-          putTracker.updateRpc(stats);
+        case 3:
+          assert "BulkLoadHFile".equals(method.getName());
+          // use generic implementation
+          break;
+        case 4:
+          assert "PrepareBulkLoad".equals(method.getName());
+          // use generic implementation
+          break;
+        case 5:
+          assert "CleanupBulkLoad".equals(method.getName());
+          // use generic implementation
+          break;
+        case 6:
+          assert "ExecService".equals(method.getName());
+          // use generic implementation
+          break;
+        case 7:
+          assert "ExecRegionServerService".equals(method.getName());
+          // use generic implementation
+          break;
+        case 8:
+          assert "Multi".equals(method.getName());
+          numActionsPerServerHist.update(stats.getNumActionsPerServer());
+          multiTracker.updateRpc(stats);
           return;
         default:
-          throw new RuntimeException("Unrecognized mutation type " + mutationType);
-        }
-      case 2:
-        assert "Scan".equals(method.getName());
-        scanTracker.updateRpc(stats);
-        return;
-      case 3:
-        assert "BulkLoadHFile".equals(method.getName());
-        // use generic implementation
-        break;
-      case 4:
-        assert "PrepareBulkLoad".equals(method.getName());
-        // use generic implementation
-        break;
-      case 5:
-        assert "CleanupBulkLoad".equals(method.getName());
-        // use generic implementation
-        break;
-      case 6:
-        assert "ExecService".equals(method.getName());
-        // use generic implementation
-        break;
-      case 7:
-        assert "ExecRegionServerService".equals(method.getName());
-        // use generic implementation
-        break;
-      case 8:
-        assert "Multi".equals(method.getName());
-        multiTracker.updateRpc(stats);
-        return;
-      default:
-        throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
+          throw new RuntimeException("Unrecognized ClientService RPC type " + method.getFullName());
       }
     }
     // Fallback to dynamic registry lookup for DDL methods.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index c8904f4..8fc5b0c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -70,6 +70,8 @@ import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.security.token.TokenSelector;
 
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
+
 /**
  * Provides the basics for a RpcClient implementation like configuration and Logging.
  * <p>
@@ -401,6 +403,17 @@ public abstract class AbstractRpcClient<T extends RpcConnection> implements RpcC
       final RpcCallback<Message> callback) {
     final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
     cs.setStartTime(EnvironmentEdgeManager.currentTime());
+
+    if (param instanceof ClientProtos.MultiRequest) {
+      ClientProtos.MultiRequest req = (ClientProtos.MultiRequest) param;
+      int numActions = 0;
+      for (ClientProtos.RegionAction regionAction : req.getRegionActionList()) {
+        numActions += regionAction.getActionCount();
+      }
+
+      cs.setNumActionsPerServer(numActions);
+    }
+
     final AtomicInteger counter = concurrentCounterCache.getUnchecked(addr);
     Call call = new Call(nextCallId(), md, param, hrc.cellScanner(), returnType,
         hrc.getCallTimeout(), hrc.getPriority(), new RpcCallback<Call>() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiActionMetricsFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiActionMetricsFromClient.java
new file mode 100644
index 0000000..349f052
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMultiActionMetricsFromClient.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.client;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.ClientTests;
+import org.apache.hadoop.hbase.testclassification.SmallTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ SmallTests.class, ClientTests.class })
+public class TestMultiActionMetricsFromClient {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestMultiActionMetricsFromClient.class);
+
+  private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
+  private static final TableName TABLE_NAME = TableName.valueOf("test_table");
+  private static final byte[] FAMILY = Bytes.toBytes("fam1");
+  private static final byte[] QUALIFIER = Bytes.toBytes("qual");
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    TEST_UTIL.startMiniCluster(1);
+    TEST_UTIL.getHBaseCluster().waitForActiveAndReadyMaster();
+    TEST_UTIL.waitUntilAllRegionsAssigned(TABLE_NAME.META_TABLE_NAME);
+    TEST_UTIL.createTable(TABLE_NAME, FAMILY);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testMultiMetrics() throws Exception {
+    Configuration conf = new Configuration(TEST_UTIL.getConfiguration());
+    conf.set(MetricsConnection.CLIENT_SIDE_METRICS_ENABLED_KEY, "true");
+    ConnectionImplementation conn =
+      (ConnectionImplementation) ConnectionFactory.createConnection(conf);
+
+    try {
+      BufferedMutator mutator = conn.getBufferedMutator(TABLE_NAME);
+      byte[][] keys = {Bytes.toBytes("aaa"), Bytes.toBytes("mmm"), Bytes.toBytes("zzz")};
+      for (byte[] key : keys) {
+        Put p = new Put(key);
+        p.addColumn(FAMILY, QUALIFIER, Bytes.toBytes(10));
+        mutator.mutate(p);
+      }
+
+      mutator.flush();
+      mutator.close();
+
+      MetricsConnection metrics = conn.getConnectionMetrics();
+      assertEquals(1, metrics.multiTracker.reqHist.getCount());
+      assertEquals(3, metrics.numActionsPerServerHist.getSnapshot().getMean(), 1e-15);
+      assertEquals(1, metrics.numActionsPerServerHist.getCount());
+    } finally {
+      conn.close();
+    }
+  }
+}