You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by el...@apache.org on 2018/08/27 13:55:40 UTC

hadoop git commit: HDDS-313. Add metrics to containerState Machine. Contributed by chencan.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 12b2f362c -> 744ce200d


HDDS-313. Add metrics to containerState Machine. Contributed by chencan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/744ce200
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/744ce200
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/744ce200

Branch: refs/heads/trunk
Commit: 744ce200d20a8f33b1dff1ad561843410c722501
Parents: 12b2f36
Author: Márton Elek <el...@apache.org>
Authored: Mon Aug 27 15:42:22 2018 +0200
Committer: Márton Elek <el...@apache.org>
Committed: Mon Aug 27 15:51:34 2018 +0200

----------------------------------------------------------------------
 .../transport/server/ratis/CSMMetrics.java      | 115 +++++++++++
 .../server/ratis/ContainerStateMachine.java     |  15 ++
 .../transport/server/ratis/TestCSMMetrics.java  | 202 +++++++++++++++++++
 3 files changed, 332 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/744ce200/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
new file mode 100644
index 0000000..b6aed60
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * This class is for maintaining Container State Machine statistics.
+ */
+@InterfaceAudience.Private
+@Metrics(about="Container State Machine Metrics", context="dfs")
+public class CSMMetrics {
+  public static final String SOURCE_NAME =
+      CSMMetrics.class.getSimpleName();
+
+  // ratis op metrics metrics
+  private @Metric MutableCounterLong numWriteStateMachineOps;
+  private @Metric MutableCounterLong numReadStateMachineOps;
+  private @Metric MutableCounterLong numApplyTransactionOps;
+
+  // Failure Metrics
+  private @Metric MutableCounterLong numWriteStateMachineFails;
+  private @Metric MutableCounterLong numReadStateMachineFails;
+  private @Metric MutableCounterLong numApplyTransactionFails;
+
+  public CSMMetrics() {
+  }
+
+  public static CSMMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME,
+        "Container State Machine",
+        new CSMMetrics());
+  }
+
+  public void incNumWriteStateMachineOps() {
+    numWriteStateMachineOps.incr();
+  }
+
+  public void incNumReadStateMachineOps() {
+    numReadStateMachineOps.incr();
+  }
+
+  public void incNumApplyTransactionsOps() {
+    numApplyTransactionOps.incr();
+  }
+
+  public void incNumWriteStateMachineFails() {
+    numWriteStateMachineFails.incr();
+  }
+
+  public void incNumReadStateMachineFails() {
+    numReadStateMachineFails.incr();
+  }
+
+  public void incNumApplyTransactionsFails() {
+    numApplyTransactionFails.incr();
+  }
+
+  @VisibleForTesting
+  public long getNumWriteStateMachineOps() {
+    return numWriteStateMachineOps.value();
+  }
+
+  @VisibleForTesting
+  public long getNumReadStateMachineOps() {
+    return numReadStateMachineOps.value();
+  }
+
+  @VisibleForTesting
+  public long getNumApplyTransactionsOps() {
+    return numApplyTransactionOps.value();
+  }
+
+  @VisibleForTesting
+  public long getNumWriteStateMachineFails() {
+    return numWriteStateMachineFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumReadStateMachineFails() {
+    return numReadStateMachineFails.value();
+  }
+
+  @VisibleForTesting
+  public long getNumApplyTransactionsFails() {
+    return numApplyTransactionFails.value();
+  }
+
+  public void unRegister() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/744ce200/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 52ea3aa..ede87f4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -117,6 +117,10 @@ public class ContainerStateMachine extends BaseStateMachine {
   private final ConcurrentHashMap<Long, CompletableFuture<Message>>
       writeChunkFutureMap;
   private final ConcurrentHashMap<Long, StateMachineHelper> stateMachineMap;
+  /**
+   * CSM metrics.
+   */
+  private final CSMMetrics metrics;
 
   public ContainerStateMachine(ContainerDispatcher dispatcher,
       ThreadPoolExecutor chunkExecutor) {
@@ -124,6 +128,7 @@ public class ContainerStateMachine extends BaseStateMachine {
     this.chunkExecutor = chunkExecutor;
     this.writeChunkFutureMap = new ConcurrentHashMap<>();
     this.stateMachineMap = new ConcurrentHashMap<>();
+    metrics = CSMMetrics.create();
   }
 
   @Override
@@ -131,6 +136,10 @@ public class ContainerStateMachine extends BaseStateMachine {
     return storage;
   }
 
+  public CSMMetrics getMetrics() {
+    return metrics;
+  }
+
   @Override
   public void initialize(
       RaftServer server, RaftGroupId id, RaftStorage raftStorage)
@@ -220,6 +229,7 @@ public class ContainerStateMachine extends BaseStateMachine {
   @Override
   public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
     try {
+      metrics.incNumWriteStateMachineOps();
       final ContainerCommandRequestProto requestProto =
           getRequestProto(entry.getSmLogEntry().getStateMachineData());
       Type cmdType = requestProto.getCmdType();
@@ -235,6 +245,7 @@ public class ContainerStateMachine extends BaseStateMachine {
       }
       return stateMachineFuture;
     } catch (IOException e) {
+      metrics.incNumWriteStateMachineFails();
       return completeExceptionally(e);
     }
   }
@@ -242,10 +253,12 @@ public class ContainerStateMachine extends BaseStateMachine {
   @Override
   public CompletableFuture<Message> query(Message request) {
     try {
+      metrics.incNumReadStateMachineOps();
       final ContainerCommandRequestProto requestProto =
           getRequestProto(request.getContent());
       return CompletableFuture.completedFuture(runCommand(requestProto));
     } catch (IOException e) {
+      metrics.incNumReadStateMachineFails();
       return completeExceptionally(e);
     }
   }
@@ -347,6 +360,7 @@ public class ContainerStateMachine extends BaseStateMachine {
   @Override
   public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
     try {
+      metrics.incNumApplyTransactionsOps();
       ContainerCommandRequestProto requestProto =
           getRequestProto(trx.getSMLogEntry().getData());
       Preconditions.checkState(!HddsUtils.isReadOnly(requestProto));
@@ -357,6 +371,7 @@ public class ContainerStateMachine extends BaseStateMachine {
       return stateMachineMap.get(requestProto.getContainerID())
           .executeContainerCommand(requestProto, index);
     } catch (IOException e) {
+      metrics.incNumApplyTransactionsFails();
       return completeExceptionally(e);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/744ce200/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
new file mode 100644
index 0000000..8b324b5
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+    .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.*;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.RatisTestHelper;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.transport.server
+    .XceiverServerSpi;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+
+import org.apache.ratis.RatisHelper;
+import org.apache.ratis.rpc.RpcType;
+import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.util.CheckedBiConsumer;
+
+import java.util.function.BiConsumer;
+
+import org.junit.Test;
+import org.junit.Assert;
+
+/**
+ * This class tests the metrics of ContainerStateMachine.
+ */
+public class TestCSMMetrics {
+  static final String TEST_DIR
+      = GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
+
+  @FunctionalInterface
+  interface CheckedBiFunction<LEFT, RIGHT, OUT, THROWABLE extends Throwable> {
+    OUT apply(LEFT left, RIGHT right) throws THROWABLE;
+  }
+
+  @Test
+  public void testContainerStateMachineMetrics() throws Exception {
+    runContainerStateMachineMetrics(1,
+        (pipeline, conf) -> RatisTestHelper.initRatisConf(GRPC, conf),
+        XceiverClientRatis::newXceiverClientRatis,
+        TestCSMMetrics::newXceiverServerRatis,
+        (dn, p) -> initXceiverServerRatis(GRPC, dn, p));
+  }
+
+  static void runContainerStateMachineMetrics(
+      int numDatanodes,
+      BiConsumer<Pipeline, OzoneConfiguration> initConf,
+      TestCSMMetrics.CheckedBiFunction<Pipeline, OzoneConfiguration,
+          XceiverClientSpi, IOException> createClient,
+      TestCSMMetrics.CheckedBiFunction<DatanodeDetails, OzoneConfiguration,
+          XceiverServerSpi, IOException> createServer,
+      CheckedBiConsumer<DatanodeDetails, Pipeline, IOException> initServer)
+      throws Exception {
+    final List<XceiverServerSpi> servers = new ArrayList<>();
+    XceiverClientSpi client = null;
+    String containerName = OzoneUtils.getRequestID();
+    try {
+      final Pipeline pipeline = ContainerTestHelper.createPipeline(
+          numDatanodes);
+      final OzoneConfiguration conf = new OzoneConfiguration();
+      initConf.accept(pipeline, conf);
+
+      for (DatanodeDetails dn : pipeline.getMachines()) {
+        final XceiverServerSpi s = createServer.apply(dn, conf);
+        servers.add(s);
+        s.start();
+        initServer.accept(dn, pipeline);
+      }
+
+      client = createClient.apply(pipeline, conf);
+      client.connect();
+
+      // Before Read Chunk/Write Chunk
+      MetricsRecordBuilder metric = getMetrics(CSMMetrics.SOURCE_NAME);
+      assertCounter("NumWriteStateMachineOps", 0L, metric);
+      assertCounter("NumReadStateMachineOps", 0L, metric);
+      assertCounter("NumApplyTransactionOps", 0L, metric);
+
+      // Write Chunk
+      BlockID blockID = ContainerTestHelper.getTestBlockID(ContainerTestHelper.
+          getTestContainerID());
+      ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+          ContainerTestHelper.getWriteChunkRequest(
+              pipeline, blockID, 1024);
+      ContainerCommandResponseProto response =
+          client.sendCommand(writeChunkRequest);
+      Assert.assertEquals(ContainerProtos.Result.SUCCESS,
+          response.getResult());
+
+      metric = getMetrics(CSMMetrics.SOURCE_NAME);
+      assertCounter("NumWriteStateMachineOps", 1L, metric);
+      assertCounter("NumApplyTransactionOps", 1L, metric);
+
+      //Read Chunk
+      ContainerProtos.ContainerCommandRequestProto readChunkRequest =
+          ContainerTestHelper.getReadChunkRequest(pipeline, writeChunkRequest
+              .getWriteChunk());
+      response = client.sendCommand(readChunkRequest);
+      Assert.assertEquals(ContainerProtos.Result.SUCCESS,
+          response.getResult());
+
+      metric = getMetrics(CSMMetrics.SOURCE_NAME);
+      assertCounter("NumReadStateMachineOps", 1L, metric);
+      assertCounter("NumApplyTransactionOps", 1L, metric);
+    } finally {
+      if (client != null) {
+        client.close();
+      }
+      servers.stream().forEach(XceiverServerSpi::stop);
+    }
+  }
+
+  static XceiverServerRatis newXceiverServerRatis(
+      DatanodeDetails dn, OzoneConfiguration conf) throws IOException {
+    conf.setInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
+        dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue());
+    final String dir = TEST_DIR + dn.getUuid();
+    conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
+
+    final ContainerDispatcher dispatcher = new TestContainerDispatcher();
+    return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher);
+  }
+
+  static void initXceiverServerRatis(
+      RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException {
+    final RaftPeer p = RatisHelper.toRaftPeer(dd);
+    final RaftClient client = RatisHelper.newRaftClient(rpc, p);
+    RaftGroupId groupId = pipeline.getId().getRaftGroupID();
+    client.reinitialize(RatisHelper.newRaftGroup(groupId,
+        pipeline.getMachines()), p.getId());
+  }
+
+  private static class TestContainerDispatcher implements ContainerDispatcher {
+    /**
+     * Dispatches commands to container layer.
+     *
+     * @param msg - Command Request
+     * @return Command Response
+     */
+    @Override
+    public ContainerCommandResponseProto dispatch(
+        ContainerCommandRequestProto msg) {
+      return ContainerTestHelper.getCreateContainerResponse(msg);
+    }
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public void shutdown() {
+    }
+
+    @Override
+    public Handler getHandler(ContainerProtos.ContainerType containerType) {
+      return null;
+    }
+
+    @Override
+    public void setScmId(String scmId) {
+
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org