You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by el...@apache.org on 2018/08/27 13:55:40 UTC
hadoop git commit: HDDS-313. Add metrics to containerState Machine.
Contributed by chencan.
Repository: hadoop
Updated Branches:
refs/heads/trunk 12b2f362c -> 744ce200d
HDDS-313. Add metrics to containerState Machine. Contributed by chencan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/744ce200
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/744ce200
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/744ce200
Branch: refs/heads/trunk
Commit: 744ce200d20a8f33b1dff1ad561843410c722501
Parents: 12b2f36
Author: Márton Elek <el...@apache.org>
Authored: Mon Aug 27 15:42:22 2018 +0200
Committer: Márton Elek <el...@apache.org>
Committed: Mon Aug 27 15:51:34 2018 +0200
----------------------------------------------------------------------
.../transport/server/ratis/CSMMetrics.java | 115 +++++++++++
.../server/ratis/ContainerStateMachine.java | 15 ++
.../transport/server/ratis/TestCSMMetrics.java | 202 +++++++++++++++++++
3 files changed, 332 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/744ce200/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
new file mode 100644
index 0000000..b6aed60
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/CSMMetrics.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * This class is for maintaining Container State Machine statistics.
+ */
+@InterfaceAudience.Private
+@Metrics(about="Container State Machine Metrics", context="dfs")
+public class CSMMetrics {
+ public static final String SOURCE_NAME =
+ CSMMetrics.class.getSimpleName();
+
+ // ratis op metrics metrics
+ private @Metric MutableCounterLong numWriteStateMachineOps;
+ private @Metric MutableCounterLong numReadStateMachineOps;
+ private @Metric MutableCounterLong numApplyTransactionOps;
+
+ // Failure Metrics
+ private @Metric MutableCounterLong numWriteStateMachineFails;
+ private @Metric MutableCounterLong numReadStateMachineFails;
+ private @Metric MutableCounterLong numApplyTransactionFails;
+
+ public CSMMetrics() {
+ }
+
+ public static CSMMetrics create() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ return ms.register(SOURCE_NAME,
+ "Container State Machine",
+ new CSMMetrics());
+ }
+
+ public void incNumWriteStateMachineOps() {
+ numWriteStateMachineOps.incr();
+ }
+
+ public void incNumReadStateMachineOps() {
+ numReadStateMachineOps.incr();
+ }
+
+ public void incNumApplyTransactionsOps() {
+ numApplyTransactionOps.incr();
+ }
+
+ public void incNumWriteStateMachineFails() {
+ numWriteStateMachineFails.incr();
+ }
+
+ public void incNumReadStateMachineFails() {
+ numReadStateMachineFails.incr();
+ }
+
+ public void incNumApplyTransactionsFails() {
+ numApplyTransactionFails.incr();
+ }
+
+ @VisibleForTesting
+ public long getNumWriteStateMachineOps() {
+ return numWriteStateMachineOps.value();
+ }
+
+ @VisibleForTesting
+ public long getNumReadStateMachineOps() {
+ return numReadStateMachineOps.value();
+ }
+
+ @VisibleForTesting
+ public long getNumApplyTransactionsOps() {
+ return numApplyTransactionOps.value();
+ }
+
+ @VisibleForTesting
+ public long getNumWriteStateMachineFails() {
+ return numWriteStateMachineFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumReadStateMachineFails() {
+ return numReadStateMachineFails.value();
+ }
+
+ @VisibleForTesting
+ public long getNumApplyTransactionsFails() {
+ return numApplyTransactionFails.value();
+ }
+
+ public void unRegister() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ ms.unregisterSource(SOURCE_NAME);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/744ce200/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 52ea3aa..ede87f4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -117,6 +117,10 @@ public class ContainerStateMachine extends BaseStateMachine {
private final ConcurrentHashMap<Long, CompletableFuture<Message>>
writeChunkFutureMap;
private final ConcurrentHashMap<Long, StateMachineHelper> stateMachineMap;
+ /**
+ * CSM metrics.
+ */
+ private final CSMMetrics metrics;
public ContainerStateMachine(ContainerDispatcher dispatcher,
ThreadPoolExecutor chunkExecutor) {
@@ -124,6 +128,7 @@ public class ContainerStateMachine extends BaseStateMachine {
this.chunkExecutor = chunkExecutor;
this.writeChunkFutureMap = new ConcurrentHashMap<>();
this.stateMachineMap = new ConcurrentHashMap<>();
+ metrics = CSMMetrics.create();
}
@Override
@@ -131,6 +136,10 @@ public class ContainerStateMachine extends BaseStateMachine {
return storage;
}
+ public CSMMetrics getMetrics() {
+ return metrics;
+ }
+
@Override
public void initialize(
RaftServer server, RaftGroupId id, RaftStorage raftStorage)
@@ -220,6 +229,7 @@ public class ContainerStateMachine extends BaseStateMachine {
@Override
public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
try {
+ metrics.incNumWriteStateMachineOps();
final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getSmLogEntry().getStateMachineData());
Type cmdType = requestProto.getCmdType();
@@ -235,6 +245,7 @@ public class ContainerStateMachine extends BaseStateMachine {
}
return stateMachineFuture;
} catch (IOException e) {
+ metrics.incNumWriteStateMachineFails();
return completeExceptionally(e);
}
}
@@ -242,10 +253,12 @@ public class ContainerStateMachine extends BaseStateMachine {
@Override
public CompletableFuture<Message> query(Message request) {
try {
+ metrics.incNumReadStateMachineOps();
final ContainerCommandRequestProto requestProto =
getRequestProto(request.getContent());
return CompletableFuture.completedFuture(runCommand(requestProto));
} catch (IOException e) {
+ metrics.incNumReadStateMachineFails();
return completeExceptionally(e);
}
}
@@ -347,6 +360,7 @@ public class ContainerStateMachine extends BaseStateMachine {
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
try {
+ metrics.incNumApplyTransactionsOps();
ContainerCommandRequestProto requestProto =
getRequestProto(trx.getSMLogEntry().getData());
Preconditions.checkState(!HddsUtils.isReadOnly(requestProto));
@@ -357,6 +371,7 @@ public class ContainerStateMachine extends BaseStateMachine {
return stateMachineMap.get(requestProto.getContainerID())
.executeContainerCommand(requestProto, index);
} catch (IOException e) {
+ metrics.incNumApplyTransactionsFails();
return completeExceptionally(e);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/744ce200/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
new file mode 100644
index 0000000..8b324b5
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.transport.server.ratis;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+
+import org.apache.hadoop.hdds.client.BlockID;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandRequestProto;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
+ .ContainerCommandResponseProto;
+import org.apache.hadoop.hdds.scm.*;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.RatisTestHelper;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.interfaces.Handler;
+import org.apache.hadoop.ozone.container.common.transport.server
+ .XceiverServerSpi;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+
+import org.apache.ratis.RatisHelper;
+import org.apache.ratis.rpc.RpcType;
+import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.util.CheckedBiConsumer;
+
+import java.util.function.BiConsumer;
+
+import org.junit.Test;
+import org.junit.Assert;
+
+/**
+ * This class tests the metrics of ContainerStateMachine.
+ */
+public class TestCSMMetrics {
+ static final String TEST_DIR
+ = GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
+
+ @FunctionalInterface
+ interface CheckedBiFunction<LEFT, RIGHT, OUT, THROWABLE extends Throwable> {
+ OUT apply(LEFT left, RIGHT right) throws THROWABLE;
+ }
+
+ @Test
+ public void testContainerStateMachineMetrics() throws Exception {
+ runContainerStateMachineMetrics(1,
+ (pipeline, conf) -> RatisTestHelper.initRatisConf(GRPC, conf),
+ XceiverClientRatis::newXceiverClientRatis,
+ TestCSMMetrics::newXceiverServerRatis,
+ (dn, p) -> initXceiverServerRatis(GRPC, dn, p));
+ }
+
+ static void runContainerStateMachineMetrics(
+ int numDatanodes,
+ BiConsumer<Pipeline, OzoneConfiguration> initConf,
+ TestCSMMetrics.CheckedBiFunction<Pipeline, OzoneConfiguration,
+ XceiverClientSpi, IOException> createClient,
+ TestCSMMetrics.CheckedBiFunction<DatanodeDetails, OzoneConfiguration,
+ XceiverServerSpi, IOException> createServer,
+ CheckedBiConsumer<DatanodeDetails, Pipeline, IOException> initServer)
+ throws Exception {
+ final List<XceiverServerSpi> servers = new ArrayList<>();
+ XceiverClientSpi client = null;
+ String containerName = OzoneUtils.getRequestID();
+ try {
+ final Pipeline pipeline = ContainerTestHelper.createPipeline(
+ numDatanodes);
+ final OzoneConfiguration conf = new OzoneConfiguration();
+ initConf.accept(pipeline, conf);
+
+ for (DatanodeDetails dn : pipeline.getMachines()) {
+ final XceiverServerSpi s = createServer.apply(dn, conf);
+ servers.add(s);
+ s.start();
+ initServer.accept(dn, pipeline);
+ }
+
+ client = createClient.apply(pipeline, conf);
+ client.connect();
+
+ // Before Read Chunk/Write Chunk
+ MetricsRecordBuilder metric = getMetrics(CSMMetrics.SOURCE_NAME);
+ assertCounter("NumWriteStateMachineOps", 0L, metric);
+ assertCounter("NumReadStateMachineOps", 0L, metric);
+ assertCounter("NumApplyTransactionOps", 0L, metric);
+
+ // Write Chunk
+ BlockID blockID = ContainerTestHelper.getTestBlockID(ContainerTestHelper.
+ getTestContainerID());
+ ContainerProtos.ContainerCommandRequestProto writeChunkRequest =
+ ContainerTestHelper.getWriteChunkRequest(
+ pipeline, blockID, 1024);
+ ContainerCommandResponseProto response =
+ client.sendCommand(writeChunkRequest);
+ Assert.assertEquals(ContainerProtos.Result.SUCCESS,
+ response.getResult());
+
+ metric = getMetrics(CSMMetrics.SOURCE_NAME);
+ assertCounter("NumWriteStateMachineOps", 1L, metric);
+ assertCounter("NumApplyTransactionOps", 1L, metric);
+
+ //Read Chunk
+ ContainerProtos.ContainerCommandRequestProto readChunkRequest =
+ ContainerTestHelper.getReadChunkRequest(pipeline, writeChunkRequest
+ .getWriteChunk());
+ response = client.sendCommand(readChunkRequest);
+ Assert.assertEquals(ContainerProtos.Result.SUCCESS,
+ response.getResult());
+
+ metric = getMetrics(CSMMetrics.SOURCE_NAME);
+ assertCounter("NumReadStateMachineOps", 1L, metric);
+ assertCounter("NumApplyTransactionOps", 1L, metric);
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ servers.stream().forEach(XceiverServerSpi::stop);
+ }
+ }
+
+ static XceiverServerRatis newXceiverServerRatis(
+ DatanodeDetails dn, OzoneConfiguration conf) throws IOException {
+ conf.setInt(OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
+ dn.getPort(DatanodeDetails.Port.Name.RATIS).getValue());
+ final String dir = TEST_DIR + dn.getUuid();
+ conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
+
+ final ContainerDispatcher dispatcher = new TestContainerDispatcher();
+ return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher);
+ }
+
+ static void initXceiverServerRatis(
+ RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException {
+ final RaftPeer p = RatisHelper.toRaftPeer(dd);
+ final RaftClient client = RatisHelper.newRaftClient(rpc, p);
+ RaftGroupId groupId = pipeline.getId().getRaftGroupID();
+ client.reinitialize(RatisHelper.newRaftGroup(groupId,
+ pipeline.getMachines()), p.getId());
+ }
+
+ private static class TestContainerDispatcher implements ContainerDispatcher {
+ /**
+ * Dispatches commands to container layer.
+ *
+ * @param msg - Command Request
+ * @return Command Response
+ */
+ @Override
+ public ContainerCommandResponseProto dispatch(
+ ContainerCommandRequestProto msg) {
+ return ContainerTestHelper.getCreateContainerResponse(msg);
+ }
+
+ @Override
+ public void init() {
+ }
+
+ @Override
+ public void shutdown() {
+ }
+
+ @Override
+ public Handler getHandler(ContainerProtos.ContainerType containerType) {
+ return null;
+ }
+
+ @Override
+ public void setScmId(String scmId) {
+
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org