You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2017/09/13 20:00:49 UTC
hadoop git commit: HDFS-12268. Ozone: Add metrics for pending storage
container requests. Contributed by Yiqun Lin.
Repository: hadoop
Updated Branches:
refs/heads/HDFS-7240 1aa7d3f5b -> c29aff44c
HDFS-12268. Ozone: Add metrics for pending storage container requests. Contributed by Yiqun Lin.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c29aff44
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c29aff44
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c29aff44
Branch: refs/heads/HDFS-7240
Commit: c29aff44cfc78a2839f15e8cde3b80f0a31c80dd
Parents: 1aa7d3f
Author: Chen Liang <cl...@apache.org>
Authored: Wed Sep 13 13:00:26 2017 -0700
Committer: Chen Liang <cl...@apache.org>
Committed: Wed Sep 13 13:00:26 2017 -0700
----------------------------------------------------------------------
.../apache/hadoop/scm/XceiverClientHandler.java | 35 +++--
.../apache/hadoop/scm/XceiverClientManager.java | 15 ++
.../apache/hadoop/scm/XceiverClientMetrics.java | 92 +++++++++++
.../src/site/markdown/OzoneMetrics.md | 11 ++
.../ozone/scm/TestXceiverClientMetrics.java | 151 +++++++++++++++++++
5 files changed, 294 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c29aff44/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
index 93d4438..38fc8f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.scm;
import com.google.common.base.Preconditions;
+import com.sun.tools.javac.util.Pair;
+
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@@ -27,6 +29,7 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
.ContainerCommandResponseProto;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -36,7 +39,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-
/**
* Netty client handler.
*/
@@ -45,18 +47,21 @@ public class XceiverClientHandler extends
static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class);
private final ConcurrentMap<String,
- CompletableFuture<ContainerCommandResponseProto>> responses =
+ Pair<CompletableFuture<ContainerCommandResponseProto>, Long>> responses =
new ConcurrentHashMap<>();
private final Pipeline pipeline;
private volatile Channel channel;
+ private XceiverClientMetrics metrics;
+
/**
* Constructs a client that can communicate to a container server.
*/
public XceiverClientHandler(Pipeline pipeline) {
super(false);
this.pipeline = pipeline;
+ this.metrics = XceiverClientManager.getXceiverClientMetrics();
}
/**
@@ -76,11 +81,18 @@ public class XceiverClientHandler extends
ContainerProtos.ContainerCommandResponseProto msg)
throws Exception {
Preconditions.checkNotNull(msg);
+ metrics.decrPendingContainerOpsMetrics(msg.getCmdType());
+
String key = msg.getTraceID();
- CompletableFuture<ContainerCommandResponseProto> future =
+ Pair<CompletableFuture<ContainerCommandResponseProto>, Long> future =
responses.remove(key);
+
if (future != null) {
- future.complete(msg);
+ future.fst.complete(msg);
+
+ long requestTime = future.snd;
+ metrics.addContainerOpsLatency(msg.getCmdType(),
+ Time.monotonicNowNanos() - requestTime);
} else {
LOG.error("A reply received for message that was not queued. trace " +
"ID: {}", msg.getTraceID());
@@ -130,11 +142,14 @@ public class XceiverClientHandler extends
if(StringUtils.isEmpty(request.getTraceID())) {
throw new IllegalArgumentException("Invalid trace ID");
}
-
- CompletableFuture<ContainerCommandResponseProto> response =
- new CompletableFuture<>();
-
- CompletableFuture<ContainerCommandResponseProto> previous =
+ metrics.incrPendingContainerOpsMetrics(request.getCmdType());
+
+ CompletableFuture<ContainerCommandResponseProto> future
+ = new CompletableFuture<>();
+ Pair<CompletableFuture<ContainerCommandResponseProto>, Long> response =
+ new Pair<CompletableFuture<ContainerCommandResponseProto>,
+ Long>(future, Time.monotonicNowNanos());
+ Pair<CompletableFuture<ContainerCommandResponseProto>, Long> previous =
responses.putIfAbsent(request.getTraceID(), response);
if (previous != null) {
@@ -147,6 +162,6 @@ public class XceiverClientHandler extends
}
channel.writeAndFlush(request);
- return response;
+ return response.fst;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c29aff44/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
index 508c004..8174e84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
@@ -62,6 +62,7 @@ public class XceiverClientManager implements Closeable {
private final Cache<String, XceiverClientSpi> clientCache;
private final boolean useRatis;
+ private static XceiverClientMetrics metrics;
/**
* Creates a new XceiverClientManager.
*
@@ -164,6 +165,10 @@ public class XceiverClientManager implements Closeable {
//closing is done through RemovalListener
clientCache.invalidateAll();
clientCache.cleanUp();
+
+ if (metrics != null) {
+ metrics.unRegister();
+ }
}
/**
@@ -197,4 +202,14 @@ public class XceiverClientManager implements Closeable {
return OzoneProtos.ReplicationType.STAND_ALONE;
}
+ /**
+ * Get xceiver client metric.
+ */
+ public synchronized static XceiverClientMetrics getXceiverClientMetrics() {
+ if (metrics == null) {
+ metrics = XceiverClientMetrics.create();
+ }
+
+ return metrics;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c29aff44/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java
new file mode 100644
index 0000000..6359db1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.scm;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+/**
+ * The client metrics for the Storage Container protocol.
+ */
+@InterfaceAudience.Private
+@Metrics(about = "Storage Container Client Metrics", context = "dfs")
+public class XceiverClientMetrics {
+ public static final String SOURCE_NAME = XceiverClientMetrics.class
+ .getSimpleName();
+
+ private @Metric MutableCounterLong pendingOps;
+ private MutableCounterLong[] pendingOpsArray;
+ private MutableRate[] containerOpsLatency;
+ private MetricsRegistry registry;
+
+ public XceiverClientMetrics() {
+ int numEnumEntries = ContainerProtos.Type.values().length;
+ this.registry = new MetricsRegistry(SOURCE_NAME);
+
+ this.pendingOpsArray = new MutableCounterLong[numEnumEntries];
+ this.containerOpsLatency = new MutableRate[numEnumEntries];
+ for (int i = 0; i < numEnumEntries; i++) {
+ pendingOpsArray[i] = registry.newCounter(
+ "numPending" + ContainerProtos.Type.valueOf(i + 1),
+ "number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops",
+ (long) 0);
+
+ containerOpsLatency[i] = registry.newRate(
+ ContainerProtos.Type.valueOf(i + 1) + "Latency",
+ "latency of " + ContainerProtos.Type.valueOf(i + 1)
+ + " ops");
+ }
+ }
+
+ public static XceiverClientMetrics create() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ return ms.register(SOURCE_NAME, "Storage Container Client Metrics",
+ new XceiverClientMetrics());
+ }
+
+ public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) {
+ pendingOps.incr();
+ pendingOpsArray[type.ordinal()].incr();
+ }
+
+ public void decrPendingContainerOpsMetrics(ContainerProtos.Type type) {
+ pendingOps.incr(-1);
+ pendingOpsArray[type.ordinal()].incr(-1);
+ }
+
+ public void addContainerOpsLatency(ContainerProtos.Type type,
+ long latencyNanos) {
+ containerOpsLatency[type.ordinal()].add(latencyNanos);
+ }
+
+ public long getContainerOpsMetrics(ContainerProtos.Type type) {
+ return pendingOpsArray[type.ordinal()].value();
+ }
+
+ public void unRegister() {
+ MetricsSystem ms = DefaultMetricsSystem.instance();
+ ms.unregisterSource(SOURCE_NAME);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c29aff44/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md
index 21e3474..2548959 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md
@@ -44,6 +44,17 @@ many times a specific operation has been performed.
Eg.`NumCreateContainer` tells us how many times create container has been
invoked on this datanode.
+*Total number of pending operation* - This is an array which counts how
+many times a specific operation is waitting to be processed from the client
+point of view.
+Eg.`NumPendingCreateContainer` tells us how many create container requests that
+waitting to be processed.
+
+*Average latency of each pending operation in nanoseconds* - The average latency
+of the operation from the client point of view.
+Eg. `CreateContainerLatencyAvgTime` - This tells us the average latency of
+Create Container from the client point of view.
+
*Number of bytes involved in a specific command* - This is an array that is
maintained for all operations, but makes sense only for read and write
operations.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/c29aff44/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java
new file mode 100644
index 0000000..7fd41a7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientMetrics;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This class tests the metrics of XceiverClient.
+ */
+public class TestXceiverClientMetrics {
+ private static OzoneConfiguration config;
+ private static MiniOzoneCluster cluster;
+ private static StorageContainerLocationProtocolClientSideTranslatorPB
+ storageContainerLocationClient;
+
+ @BeforeClass
+ public static void init() throws IOException {
+ config = new OzoneConfiguration();
+ cluster = new MiniOzoneCluster.Builder(config)
+ .numDataNodes(1)
+ .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+ storageContainerLocationClient = cluster
+ .createStorageContainerLocationClient();
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testMetrics() throws Exception {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ XceiverClientManager clientManager = new XceiverClientManager(conf);
+
+ String containerName = "container" + RandomStringUtils.randomNumeric(10);
+ Pipeline pipeline = storageContainerLocationClient.allocateContainer(
+ clientManager.getType(), clientManager.getFactor(), containerName);
+ XceiverClientSpi client = clientManager.acquireClient(pipeline);
+
+ ContainerCommandRequestProto request = ContainerTestHelper
+ .getCreateContainerRequest(containerName);
+ client.sendCommand(request);
+
+ MetricsRecordBuilder containerMetrics = getMetrics(
+ XceiverClientMetrics.SOURCE_NAME);
+ // Above request command is in a synchronous way, so there will be no
+ // pending requests.
+ assertCounter("PendingOps", 0L, containerMetrics);
+ assertCounter("numPendingCreateContainer", 0L, containerMetrics);
+ // the counter value of average latency metric should be increased
+ assertCounter("CreateContainerLatencyNumOps", 1L, containerMetrics);
+
+ List<CompletableFuture<ContainerCommandResponseProto>> computeResults
+ = new ArrayList<>();
+ int numRequest = 10;
+ // start new thread to send async requests
+ Thread sendThread = new Thread(() -> {
+ while (true) {
+ try {
+ // use async interface for testing pending metrics
+ for (int i = 0; i < numRequest; i++) {
+ String keyName = OzoneUtils.getRequestID();
+ ContainerProtos.ContainerCommandRequestProto smallFileRequest;
+
+ smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest(
+ client.getPipeline(), containerName, keyName, 1024);
+ CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+ response = client.sendCommandAsync(smallFileRequest);
+ computeResults.add(response);
+ }
+ Thread.sleep(1000);
+ } catch (Exception ignored) {
+ }
+ }
+ });
+ sendThread.start();
+
+ GenericTestUtils.waitFor(() -> {
+ // check if pending metric count is increased
+ MetricsRecordBuilder metric =
+ getMetrics(XceiverClientMetrics.SOURCE_NAME);
+ long pendingOps = getLongCounter("PendingOps", metric);
+ long pendingPutSmallFileOps =
+ getLongCounter("numPendingPutSmallFile", metric);
+ return pendingOps > 0 && pendingPutSmallFileOps > 0;
+ }, 100, 60000);
+ sendThread.interrupt();
+
+ // Wait for all futures being done.
+ GenericTestUtils.waitFor(() -> {
+ for (CompletableFuture future : computeResults) {
+ if (!future.isDone()) {
+ return false;
+ }
+ }
+
+ return true;
+ }, 100, 60000);
+
+ // the counter value of pending metrics should be decreased to 0
+ containerMetrics = getMetrics(XceiverClientMetrics.SOURCE_NAME);
+ containerMetrics = getMetrics(XceiverClientMetrics.SOURCE_NAME);
+ assertCounter("PendingOps", 0L, containerMetrics);
+ assertCounter("numPendingPutSmallFile", 0L, containerMetrics);
+
+ clientManager.close();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org