You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/02/27 17:45:10 UTC

hadoop git commit: HDFS-11463. Ozone: Add metrics for container operations and export over JMX. Contributed by Mukul Kumar Singh.

Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 124ffbd55 -> ae783b199


HDFS-11463. Ozone: Add metrics for container operations and export over JMX. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ae783b19
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ae783b19
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ae783b19

Branch: refs/heads/HDFS-7240
Commit: ae783b1991e764954d8dd9aea4952c7a4599bf50
Parents: 124ffbd
Author: Anu Engineer <ae...@apache.org>
Authored: Mon Feb 27 09:44:48 2017 -0800
Committer: Anu Engineer <ae...@apache.org>
Committed: Mon Feb 27 09:44:48 2017 -0800

----------------------------------------------------------------------
 .../common/helpers/ContainerMetrics.java        | 120 +++++++++++++++++++
 .../ozone/container/common/impl/Dispatcher.java |  49 ++++++--
 .../common/interfaces/ContainerDispatcher.java  |   9 ++
 .../container/ozoneimpl/OzoneContainer.java     |   5 +-
 .../transport/server/TestContainerServer.java   |  18 ++-
 5 files changed, 191 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae783b19/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
new file mode 100644
index 0000000..55c0fbc
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerMetrics.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+import org.apache.hadoop.metrics2.lib.MutableRate;
+import org.apache.hadoop.metrics2.lib.MetricsRegistry;
+import org.apache.hadoop.metrics2.lib.MutableQuantiles;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+
+/**
+ *
+ * This class is for maintaining  the various Storage Container
+ * DataNode statistics and publishing them through the metrics interfaces.
+ * This also registers the JMX MBean for RPC.
+ * <p>
+ * This class has a number of metrics variables that are publicly accessible;
+ * these variables (objects) have methods to update their values;
+ *  for example:
+ *  <p> {@link #numOps}.inc()
+ *
+ */
+@InterfaceAudience.Private
+@Metrics(about="Storage Container DataNode Metrics", context="dfs")
+public class ContainerMetrics {
+  @Metric private MutableCounterLong numOps;
+  private MutableCounterLong[] numOpsArray;
+  private MutableCounterLong[] opsBytesArray;
+  private MutableRate[] opsLatency;
+  private MutableQuantiles[][] opsLatQuantiles;
+  private MetricsRegistry registry = null;
+
+  public ContainerMetrics(int[] intervals) {
+    int numEnumEntries = ContainerProtos.Type.values().length;
+    final int len = intervals.length;
+    this.numOpsArray = new MutableCounterLong[numEnumEntries];
+    this.opsBytesArray = new MutableCounterLong[numEnumEntries];
+    this.opsLatency = new MutableRate[numEnumEntries];
+    this.opsLatQuantiles = new MutableQuantiles[numEnumEntries][len];
+    this.registry = new MetricsRegistry("StorageContainerMetrics");
+    for (int i = 0; i < numEnumEntries; i++) {
+      numOpsArray[i] =
+              registry.newCounter("num" + ContainerProtos.Type.valueOf(i),
+              "number of " + ContainerProtos.Type.valueOf(i) + " ops", (long)0);
+      opsBytesArray[i] =
+              registry.newCounter("bytes" + ContainerProtos.Type.valueOf(i),
+              "bytes used by " + ContainerProtos.Type.valueOf(i)+"op", (long)0);
+      opsLatency[i] =
+              registry.newRate("latency" + ContainerProtos.Type.valueOf(i),
+                         ContainerProtos.Type.valueOf(i) + " op");
+
+      for (int j = 0; j < len; j++) {
+        int interval = intervals[j];
+        String quantileName = ContainerProtos.Type.valueOf(i) + "Nanos" +
+                              interval + "s";
+        opsLatQuantiles[i][j] =
+              registry.newQuantiles(quantileName, "latency of Container ops",
+                        "ops", "latency", interval);
+      }
+    }
+  }
+
+  public static ContainerMetrics create(Configuration conf) {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    // Percentile measurement is off by default, by watching no intervals
+    int[] intervals =
+             conf.getInts(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY);
+    return ms.register("StorageContainerMetrics",
+                       "Storage Container Node Metrics",
+                       new ContainerMetrics(intervals));
+  }
+
+  public void incContainerOpcMetrics(ContainerProtos.Type type){
+    numOps.incr();
+    numOpsArray[type.ordinal()].incr();
+  }
+
+  public long getContainerOpsMetrics(ContainerProtos.Type type){
+    return numOpsArray[type.ordinal()].value();
+  }
+
+  public void incContainerOpsLatencies(ContainerProtos.Type type,
+                                       long latencyNanos) {
+    opsLatency[type.ordinal()].add(latencyNanos);
+    for (MutableQuantiles q: opsLatQuantiles[type.ordinal()]) {
+      q.add(latencyNanos);
+    }
+  }
+
+  public void incContainerBytesStats(ContainerProtos.Type type, long bytes) {
+    opsBytesArray[type.ordinal()].incr(bytes);
+  }
+
+  public long getContainerBytesMetrics(ContainerProtos.Type type){
+    return opsBytesArray[type.ordinal()].value();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae783b19/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
index c805daa..8cdbf9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/impl/Dispatcher.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.ozone.container.common.impl;
 
 import com.google.common.base.Preconditions;
 import com.google.protobuf.ByteString;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
     .ContainerCommandRequestProto;
@@ -33,6 +34,7 @@ import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.helpers.FileUtils;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
 import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
@@ -51,52 +53,76 @@ public class Dispatcher implements ContainerDispatcher {
   static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
 
   private final ContainerManager containerManager;
-
+  private ContainerMetrics metrics;
+  private Configuration conf;
   /**
    * Constructs an OzoneContainer that receives calls from
    * XceiverServerHandler.
    *
    * @param containerManager - A class that manages containers.
    */
-  public Dispatcher(ContainerManager containerManager) {
+  public Dispatcher(ContainerManager containerManager, Configuration config) {
     Preconditions.checkNotNull(containerManager);
     this.containerManager = containerManager;
+    this.metrics = null;
+    this.conf = config;
+  }
+
+  @Override
+  public void init() {
+    this.metrics = ContainerMetrics.create(conf);
+  }
+
+  @Override
+  public void shutdown() {
   }
 
   @Override
   public ContainerCommandResponseProto dispatch(
       ContainerCommandRequestProto msg) throws IOException {
+    long startNanos = System.nanoTime();
+    ContainerCommandResponseProto resp = null;
     Preconditions.checkNotNull(msg);
     Type cmdType = msg.getCmdType();
+    metrics.incContainerOpcMetrics(cmdType);
     if ((cmdType == Type.CreateContainer) ||
         (cmdType == Type.DeleteContainer) ||
         (cmdType == Type.ReadContainer) ||
         (cmdType == Type.ListContainer) ||
         (cmdType == Type.UpdateContainer)) {
-      return containerProcessHandler(msg);
+      resp = containerProcessHandler(msg);
     }
 
     if ((cmdType == Type.PutKey) ||
         (cmdType == Type.GetKey) ||
         (cmdType == Type.DeleteKey) ||
         (cmdType == Type.ListKey)) {
-      return keyProcessHandler(msg);
+      resp = keyProcessHandler(msg);
     }
 
     if ((cmdType == Type.WriteChunk) ||
         (cmdType == Type.ReadChunk) ||
         (cmdType == Type.DeleteChunk)) {
-      return chunkProcessHandler(msg);
+      resp = chunkProcessHandler(msg);
     }
 
     if ((cmdType == Type.PutSmallFile) ||
         (cmdType == Type.GetSmallFile)) {
-      return smallFileHandler(msg);
+      resp = smallFileHandler(msg);
+    }
+
+    if (resp != null) {
+      metrics.incContainerOpsLatencies(cmdType,
+                            System.nanoTime() - startNanos);
+      return resp;
     }
 
     return ContainerUtils.unsupportedRequest(msg);
   }
 
+  public ContainerMetrics getContainerMetrics() {
+    return metrics;
+  }
   /**
    * Handles the all Container related functionality.
    *
@@ -336,6 +362,7 @@ public class Dispatcher implements ContainerDispatcher {
         .getChunkData());
     Preconditions.checkNotNull(chunkInfo);
     byte[] data = msg.getWriteChunk().getData().toByteArray();
+    metrics.incContainerBytesStats(Type.WriteChunk, data.length);
     this.containerManager.getChunkManager().writeChunk(pipeline, keyName,
         chunkInfo, data);
     return ChunkUtils.getChunkResponse(msg);
@@ -366,6 +393,7 @@ public class Dispatcher implements ContainerDispatcher {
     Preconditions.checkNotNull(chunkInfo);
     byte[] data = this.containerManager.getChunkManager().readChunk(pipeline,
         keyName, chunkInfo);
+    metrics.incContainerBytesStats(Type.ReadChunk, data.length);
     return ChunkUtils.getReadChunkResponse(msg, data, chunkInfo);
   }
 
@@ -417,6 +445,8 @@ public class Dispatcher implements ContainerDispatcher {
     KeyData keyData = KeyData.getFromProtoBuf(msg.getPutKey().getKeyData());
     Preconditions.checkNotNull(keyData);
     this.containerManager.getKeyManager().putKey(pipeline, keyData);
+    long numBytes = keyData.getProtoBufMessage().toByteArray().length;
+    metrics.incContainerBytesStats(Type.PutKey, numBytes);
     return KeyUtils.getKeyResponse(msg);
   }
 
@@ -438,6 +468,8 @@ public class Dispatcher implements ContainerDispatcher {
     Preconditions.checkNotNull(keyData);
     KeyData responseData =
         this.containerManager.getKeyManager().getKey(keyData);
+    long numBytes = responseData.getProtoBufMessage().toByteArray().length;
+    metrics.incContainerBytesStats(Type.GetKey, numBytes);
     return KeyUtils.getKeyDataResponse(msg, responseData);
   }
 
@@ -492,6 +524,7 @@ public class Dispatcher implements ContainerDispatcher {
         .getChunkInfo());
     byte[] data = msg.getPutSmallFile().getData().toByteArray();
 
+    metrics.incContainerBytesStats(Type.PutSmallFile, data.length);
     this.containerManager.getChunkManager().writeChunk(pipeline, keyData
         .getKeyName(), chunkInfo, data);
     List<ContainerProtos.ChunkInfo> chunks = new LinkedList<>();
@@ -518,12 +551,14 @@ public class Dispatcher implements ContainerDispatcher {
     }
     Pipeline pipeline =
         Pipeline.getFromProtoBuf(msg.getGetSmallFile().getKey().getPipeline());
+    long bytes = 0;
     Preconditions.checkNotNull(pipeline);
     KeyData keyData = KeyData.getFromProtoBuf(msg.getGetSmallFile()
         .getKey().getKeyData());
     KeyData data = this.containerManager.getKeyManager().getKey(keyData);
     ContainerProtos.ChunkInfo c = null;
     for (ContainerProtos.ChunkInfo chunk : data.getChunks()) {
+      bytes += chunk.getSerializedSize();
       ByteString current =
           ByteString.copyFrom(this.containerManager.getChunkManager().readChunk(
               pipeline, keyData.getKeyName(), ChunkInfo.getFromProtoBuf(
@@ -531,7 +566,7 @@ public class Dispatcher implements ContainerDispatcher {
       dataBuf = dataBuf.concat(current);
       c = chunk;
     }
-
+    metrics.incContainerBytesStats(Type.GetSmallFile,  bytes);
     return FileUtils.getGetSmallFileResponse(msg, dataBuf.toByteArray(),
         ChunkInfo.getFromProtoBuf(c));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae783b19/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
index 6ad8377..48ca5de 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDispatcher.java
@@ -41,4 +41,13 @@ public interface ContainerDispatcher {
   ContainerCommandResponseProto dispatch(ContainerCommandRequestProto msg)
       throws IOException;
 
+  /**
+   * Initialize the Dispatcher.
+   */
+  void init();
+
+  /**
+   * Shutdown Dispatcher services.
+   */
+  void shutdown();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae783b19/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index d088b4e..0c3cd91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -81,7 +81,7 @@ public class OzoneContainer {
     this.keyManager = new KeyManagerImpl(manager, ozoneConfig);
     manager.setKeyManager(this.keyManager);
 
-    this.dispatcher = new Dispatcher(manager);
+    this.dispatcher = new Dispatcher(manager, this.ozoneConfig);
     server = new XceiverServer(this.ozoneConfig, this.dispatcher);
   }
 
@@ -92,6 +92,7 @@ public class OzoneContainer {
    */
   public void start() throws IOException {
     server.start();
+    dispatcher.init();
   }
 
   /**
@@ -129,6 +130,8 @@ public class OzoneContainer {
   public void stop() {
     LOG.info("Attempting to stop container services.");
     server.stop();
+    dispatcher.shutdown();
+
     try {
       this.manager.writeLock();
       this.chunkManager.shutdown();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ae783b19/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
index 10ed9d1..6b9e266 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/transport/server/TestContainerServer.java
@@ -108,8 +108,10 @@ public class TestContainerServer {
       conf.setInt(OzoneConfigKeys.DFS_CONTAINER_IPC_PORT,
           pipeline.getLeader().getContainerPort());
 
-      server = new XceiverServer(conf, new Dispatcher(
-          mock(ContainerManager.class)));
+      Dispatcher dispatcher =
+              new Dispatcher(mock(ContainerManager.class), conf);
+      dispatcher.init();
+      server = new XceiverServer(conf, dispatcher);
       client = new XceiverClient(pipeline, conf);
 
       server.start();
@@ -120,6 +122,10 @@ public class TestContainerServer {
       ContainerCommandResponseProto response = client.sendCommand(request);
       Assert.assertTrue(request.getTraceID().equals(response.getTraceID()));
       Assert.assertEquals(response.getResult(), ContainerProtos.Result.SUCCESS);
+      Assert.assertTrue(dispatcher.
+                          getContainerMetrics().
+                            getContainerOpsMetrics(
+                              ContainerProtos.Type.CreateContainer)== 1);
     } finally {
       if (client != null) {
         client.close();
@@ -143,5 +149,13 @@ public class TestContainerServer {
     dispatch(ContainerCommandRequestProto msg) throws IOException {
       return ContainerTestHelper.getCreateContainerResponse(msg);
     }
+
+    @Override
+    public void init() {
+    }
+
+    @Override
+    public void shutdown() {
+    }
   }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org