You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cl...@apache.org on 2017/09/13 20:00:49 UTC

hadoop git commit: HDFS-12268. Ozone: Add metrics for pending storage container requests. Contributed by Yiqun Lin.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 1aa7d3f5b -> c29aff44c


HDFS-12268. Ozone: Add metrics for pending storage container requests. Contributed by Yiqun Lin.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c29aff44
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c29aff44
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c29aff44

Branch: refs/heads/HDFS-7240
Commit: c29aff44cfc78a2839f15e8cde3b80f0a31c80dd
Parents: 1aa7d3f
Author: Chen Liang <cl...@apache.org>
Authored: Wed Sep 13 13:00:26 2017 -0700
Committer: Chen Liang <cl...@apache.org>
Committed: Wed Sep 13 13:00:26 2017 -0700

----------------------------------------------------------------------
 .../apache/hadoop/scm/XceiverClientHandler.java |  35 +++--
 .../apache/hadoop/scm/XceiverClientManager.java |  15 ++
 .../apache/hadoop/scm/XceiverClientMetrics.java |  92 +++++++++++
 .../src/site/markdown/OzoneMetrics.md           |  11 ++
 .../ozone/scm/TestXceiverClientMetrics.java     | 151 +++++++++++++++++++
 5 files changed, 294 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c29aff44/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
index 93d4438..38fc8f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientHandler.java
@@ -18,6 +18,8 @@
 package org.apache.hadoop.scm;
 
 import com.google.common.base.Preconditions;
+import com.sun.tools.javac.util.Pair;
+
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
@@ -27,6 +29,7 @@ import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
     .ContainerCommandResponseProto;
 
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,7 +39,6 @@ import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
-
 /**
  * Netty client handler.
  */
@@ -45,18 +47,21 @@ public class XceiverClientHandler extends
 
   static final Logger LOG = LoggerFactory.getLogger(XceiverClientHandler.class);
   private final ConcurrentMap<String,
-      CompletableFuture<ContainerCommandResponseProto>> responses =
+      Pair<CompletableFuture<ContainerCommandResponseProto>, Long>> responses =
       new ConcurrentHashMap<>();
 
   private final Pipeline pipeline;
   private volatile Channel channel;
 
+  private XceiverClientMetrics metrics;
+
   /**
    * Constructs a client that can communicate to a container server.
    */
   public XceiverClientHandler(Pipeline pipeline) {
     super(false);
     this.pipeline = pipeline;
+    this.metrics = XceiverClientManager.getXceiverClientMetrics();
   }
 
   /**
@@ -76,11 +81,18 @@ public class XceiverClientHandler extends
       ContainerProtos.ContainerCommandResponseProto msg)
       throws Exception {
     Preconditions.checkNotNull(msg);
+    metrics.decrPendingContainerOpsMetrics(msg.getCmdType());
+
     String key = msg.getTraceID();
-    CompletableFuture<ContainerCommandResponseProto> future =
+    Pair<CompletableFuture<ContainerCommandResponseProto>, Long> future =
         responses.remove(key);
+
     if (future != null) {
-      future.complete(msg);
+      future.fst.complete(msg);
+
+      long requestTime = future.snd;
+      metrics.addContainerOpsLatency(msg.getCmdType(),
+          Time.monotonicNowNanos() - requestTime);
     } else {
       LOG.error("A reply received for message that was not queued. trace " +
           "ID: {}", msg.getTraceID());
@@ -130,11 +142,14 @@ public class XceiverClientHandler extends
     if(StringUtils.isEmpty(request.getTraceID())) {
       throw new IllegalArgumentException("Invalid trace ID");
     }
-
-    CompletableFuture<ContainerCommandResponseProto> response =
-        new CompletableFuture<>();
-
-    CompletableFuture<ContainerCommandResponseProto> previous =
+    metrics.incrPendingContainerOpsMetrics(request.getCmdType());
+
+    CompletableFuture<ContainerCommandResponseProto> future
+        = new CompletableFuture<>();
+    Pair<CompletableFuture<ContainerCommandResponseProto>, Long> response =
+        new Pair<CompletableFuture<ContainerCommandResponseProto>,
+        Long>(future, Time.monotonicNowNanos());
+    Pair<CompletableFuture<ContainerCommandResponseProto>, Long> previous =
         responses.putIfAbsent(request.getTraceID(), response);
 
     if (previous != null) {
@@ -147,6 +162,6 @@ public class XceiverClientHandler extends
     }
 
     channel.writeAndFlush(request);
-    return response;
+    return response.fst;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c29aff44/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
index 508c004..8174e84 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientManager.java
@@ -62,6 +62,7 @@ public class XceiverClientManager implements Closeable {
   private final Cache<String, XceiverClientSpi> clientCache;
   private final boolean useRatis;
 
+  private static XceiverClientMetrics metrics;
   /**
    * Creates a new XceiverClientManager.
    *
@@ -164,6 +165,10 @@ public class XceiverClientManager implements Closeable {
     //closing is done through RemovalListener
     clientCache.invalidateAll();
     clientCache.cleanUp();
+
+    if (metrics != null) {
+      metrics.unRegister();
+    }
   }
 
   /**
@@ -197,4 +202,14 @@ public class XceiverClientManager implements Closeable {
     return OzoneProtos.ReplicationType.STAND_ALONE;
   }
 
+  /**
+   * Get xceiver client metric.
+   */
+  public synchronized static XceiverClientMetrics getXceiverClientMetrics() {
+    if (metrics == null) {
+      metrics = XceiverClientMetrics.create();
+    }
+
+    return metrics;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c29aff44/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java
new file mode 100644
index 0000000..6359db1
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/XceiverClientMetrics.java
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.scm;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+
+/**
+ * The client metrics for the Storage Container protocol.
+ */
+@InterfaceAudience.Private
+@Metrics(about = "Storage Container Client Metrics", context = "dfs")
+public class XceiverClientMetrics {
+  public static final String SOURCE_NAME = XceiverClientMetrics.class
+      .getSimpleName();
+
+  private @Metric MutableCounterLong pendingOps;
+  private MutableCounterLong[] pendingOpsArray;
+  private MutableRate[] containerOpsLatency;
+  private MetricsRegistry registry;
+
+  public XceiverClientMetrics() {
+    int numEnumEntries = ContainerProtos.Type.values().length;
+    this.registry = new MetricsRegistry(SOURCE_NAME);
+
+    this.pendingOpsArray = new MutableCounterLong[numEnumEntries];
+    this.containerOpsLatency = new MutableRate[numEnumEntries];
+    for (int i = 0; i < numEnumEntries; i++) {
+      pendingOpsArray[i] = registry.newCounter(
+          "numPending" + ContainerProtos.Type.valueOf(i + 1),
+          "number of pending" + ContainerProtos.Type.valueOf(i + 1) + " ops",
+          (long) 0);
+
+      containerOpsLatency[i] = registry.newRate(
+          ContainerProtos.Type.valueOf(i + 1) + "Latency",
+          "latency of " + ContainerProtos.Type.valueOf(i + 1)
+          + " ops");
+    }
+  }
+
+  public static XceiverClientMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME, "Storage Container Client Metrics",
+        new XceiverClientMetrics());
+  }
+
+  public void incrPendingContainerOpsMetrics(ContainerProtos.Type type) {
+    pendingOps.incr();
+    pendingOpsArray[type.ordinal()].incr();
+  }
+
+  public void decrPendingContainerOpsMetrics(ContainerProtos.Type type) {
+    pendingOps.incr(-1);
+    pendingOpsArray[type.ordinal()].incr(-1);
+  }
+
+  public void addContainerOpsLatency(ContainerProtos.Type type,
+      long latencyNanos) {
+    containerOpsLatency[type.ordinal()].add(latencyNanos);
+  }
+
+  public long getContainerOpsMetrics(ContainerProtos.Type type) {
+    return pendingOpsArray[type.ordinal()].value();
+  }
+
+  public void unRegister() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c29aff44/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md
index 21e3474..2548959 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/OzoneMetrics.md
@@ -44,6 +44,17 @@ many times a specific operation has been performed.
 Eg.`NumCreateContainer` tells us how many times create container has been
 invoked on this datanode.
 
+*Total number of pending operation* - This is an array which counts how
+many times a specific operation is waitting to be processed from the client
+point of view.
+Eg.`NumPendingCreateContainer` tells us how many create container requests that
+waitting to be processed.
+
+*Average latency of each pending operation in nanoseconds* - The average latency
+of the operation from the client point of view.
+Eg. `CreateContainerLatencyAvgTime` - This tells us the average latency of
+Create Container from the client point of view.
+
 *Number of bytes involved in a specific command* - This is an array that is
 maintained for all operations, but makes sense only for read and write
 operations.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c29aff44/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java
new file mode 100644
index 0000000..7fd41a7
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/TestXceiverClientMetrics.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.scm;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandRequestProto;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos.ContainerCommandResponseProto;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.ContainerTestHelper;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.scm.XceiverClientManager;
+import org.apache.hadoop.scm.XceiverClientMetrics;
+import org.apache.hadoop.scm.XceiverClientSpi;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * This class tests the metrics of XceiverClient.
+ */
+public class TestXceiverClientMetrics {
+  private static OzoneConfiguration config;
+  private static MiniOzoneCluster cluster;
+  private static StorageContainerLocationProtocolClientSideTranslatorPB
+      storageContainerLocationClient;
+
+  @BeforeClass
+  public static void init() throws IOException {
+    config = new OzoneConfiguration();
+    cluster = new MiniOzoneCluster.Builder(config)
+        .numDataNodes(1)
+        .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+    storageContainerLocationClient = cluster
+        .createStorageContainerLocationClient();
+  }
+
+  @AfterClass
+  public static void shutdown() {
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testMetrics() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    XceiverClientManager clientManager = new XceiverClientManager(conf);
+
+    String containerName = "container" + RandomStringUtils.randomNumeric(10);
+    Pipeline pipeline = storageContainerLocationClient.allocateContainer(
+        clientManager.getType(), clientManager.getFactor(), containerName);
+    XceiverClientSpi client = clientManager.acquireClient(pipeline);
+
+    ContainerCommandRequestProto request = ContainerTestHelper
+        .getCreateContainerRequest(containerName);
+    client.sendCommand(request);
+
+    MetricsRecordBuilder containerMetrics = getMetrics(
+        XceiverClientMetrics.SOURCE_NAME);
+    // Above request command is in a synchronous way, so there will be no
+    // pending requests.
+    assertCounter("PendingOps", 0L, containerMetrics);
+    assertCounter("numPendingCreateContainer", 0L, containerMetrics);
+    // the counter value of average latency metric should be increased
+    assertCounter("CreateContainerLatencyNumOps", 1L, containerMetrics);
+
+    List<CompletableFuture<ContainerCommandResponseProto>> computeResults
+        = new ArrayList<>();
+    int numRequest = 10;
+    // start new thread to send async requests
+    Thread sendThread = new Thread(() -> {
+      while (true) {
+        try {
+          // use async interface for testing pending metrics
+          for (int i = 0; i < numRequest; i++) {
+            String keyName = OzoneUtils.getRequestID();
+            ContainerProtos.ContainerCommandRequestProto smallFileRequest;
+
+            smallFileRequest = ContainerTestHelper.getWriteSmallFileRequest(
+                client.getPipeline(), containerName, keyName, 1024);
+            CompletableFuture<ContainerProtos.ContainerCommandResponseProto>
+                response = client.sendCommandAsync(smallFileRequest);
+            computeResults.add(response);
+          }
+          Thread.sleep(1000);
+        } catch (Exception ignored) {
+        }
+      }
+    });
+    sendThread.start();
+
+    GenericTestUtils.waitFor(() -> {
+      // check if pending metric count is increased
+      MetricsRecordBuilder metric =
+          getMetrics(XceiverClientMetrics.SOURCE_NAME);
+      long pendingOps = getLongCounter("PendingOps", metric);
+      long pendingPutSmallFileOps =
+          getLongCounter("numPendingPutSmallFile", metric);
+      return pendingOps > 0 && pendingPutSmallFileOps > 0;
+    }, 100, 60000);
+    sendThread.interrupt();
+
+    // Wait for all futures being done.
+    GenericTestUtils.waitFor(() -> {
+      for (CompletableFuture future : computeResults) {
+        if (!future.isDone()) {
+          return false;
+        }
+      }
+
+      return true;
+    }, 100, 60000);
+
+    // the counter value of pending metrics should be decreased to 0
+    containerMetrics = getMetrics(XceiverClientMetrics.SOURCE_NAME);
+    containerMetrics = getMetrics(XceiverClientMetrics.SOURCE_NAME);
+    assertCounter("PendingOps", 0L, containerMetrics);
+    assertCounter("numPendingPutSmallFile", 0L, containerMetrics);
+
+    clientManager.close();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org