You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2019/02/24 23:45:55 UTC

[hadoop] branch trunk updated: HDDS-1070. Adding Node and Pipeline related metrics in SCM. Contributed by Nandakumar.

This is an automated email from the ASF dual-hosted git repository.

aengineer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 92b1fdc  HDDS-1070. Adding Node and Pipeline related metrics in SCM. Contributed by Nandakumar.
92b1fdc is described below

commit 92b1fdceced4f670bf14e094580db73ebb83c9bd
Author: Anu Engineer <ae...@apache.org>
AuthorDate: Sun Feb 24 15:39:41 2019 -0800

    HDDS-1070. Adding Node and Pipeline related metrics in SCM.
    Contributed by Nandakumar.
---
 .../hadoop/hdds/scm/node/SCMNodeManager.java       |   9 +-
 .../hadoop/hdds/scm/node/SCMNodeMetrics.java       |  93 ++++++++++++++
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |   2 +-
 .../hdds/scm/pipeline/PipelineManagerMXBean.java   |  38 ++++++
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  39 +++++-
 .../hdds/scm/pipeline/SCMPipelineMetrics.java      | 108 +++++++++++++++++
 .../hadoop/ozone/scm/node/TestSCMNodeMetrics.java  | 135 +++++++++++++++++++++
 .../apache/hadoop/ozone/scm/node/package-info.java |  24 ++++
 .../scm/pipeline/TestPipelineManagerMXBean.java    |  97 +++++++++++++++
 .../ozone/scm/pipeline/TestSCMPipelineMetrics.java |  98 +++++++++++++++
 .../hadoop/ozone/scm/pipeline/package-info.java    |  24 ++++
 11 files changed, 663 insertions(+), 4 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 16a10ac..e457b13 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -91,10 +91,9 @@ public class SCMNodeManager implements NodeManager {
   private final String clusterID;
   private final VersionInfo version;
   private final CommandQueue commandQueue;
+  private final SCMNodeMetrics metrics;
   // Node manager MXBean
   private ObjectName nmInfoBean;
-
-  // Node pool manager.
   private final StorageContainerManager scmManager;
 
   /**
@@ -103,6 +102,7 @@ public class SCMNodeManager implements NodeManager {
   public SCMNodeManager(OzoneConfiguration conf, String clusterID,
       StorageContainerManager scmManager, EventPublisher eventPublisher)
       throws IOException {
+    this.metrics = SCMNodeMetrics.create();
     this.nodeStateManager = new NodeStateManager(conf, eventPublisher);
     this.clusterID = clusterID;
     this.version = VersionInfo.getLatestVersion();
@@ -185,6 +185,7 @@ public class SCMNodeManager implements NodeManager {
   @Override
   public void close() throws IOException {
     unregisterMXBean();
+    metrics.unRegister();
   }
 
   /**
@@ -257,7 +258,9 @@ public class SCMNodeManager implements NodeManager {
         "DatanodeDetails.");
     try {
       nodeStateManager.updateLastHeartbeatTime(datanodeDetails);
+      metrics.incNumHBProcessed();
     } catch (NodeNotFoundException e) {
+      metrics.incNumHBProcessingFailed();
       LOG.error("SCM trying to process heartbeat from an " +
           "unregistered node {}. Ignoring the heartbeat.", datanodeDetails);
     }
@@ -287,8 +290,10 @@ public class SCMNodeManager implements NodeManager {
       DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails);
       if (nodeReport != null) {
         datanodeInfo.updateStorageReports(nodeReport.getStorageReportList());
+        metrics.incNumNodeReportProcessed();
       }
     } catch (NodeNotFoundException e) {
+      metrics.incNumNodeReportProcessingFailed();
       LOG.warn("Got node report from unregistered datanode {}",
           datanodeDetails);
     }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
new file mode 100644
index 0000000..30b1079
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeMetrics.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.node;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * This class maintains Node related metrics.
+ */
+@InterfaceAudience.Private
+@Metrics(about = "SCM NodeManager Metrics", context = "ozone")
+public final class SCMNodeMetrics {
+
+  private static final String SOURCE_NAME =
+      SCMNodeMetrics.class.getSimpleName();
+
+  private @Metric MutableCounterLong numHBProcessed;
+  private @Metric MutableCounterLong numHBProcessingFailed;
+  private @Metric MutableCounterLong numNodeReportProcessed;
+  private @Metric MutableCounterLong numNodeReportProcessingFailed;
+
+  /** Private constructor. */
+  private SCMNodeMetrics() { }
+
+  /**
+   * Create and returns SCMNodeMetrics instance.
+   *
+   * @return SCMNodeMetrics
+   */
+  public static SCMNodeMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME, "SCM NodeManager Metrics",
+        new SCMNodeMetrics());
+  }
+
+  /**
+   * Unregister the metrics instance.
+   */
+  public void unRegister() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+
+  /**
+   * Increments number of heartbeat processed count.
+   */
+  void incNumHBProcessed() {
+    numHBProcessed.incr();
+  }
+
+  /**
+   * Increments number of heartbeat processing failed count.
+   */
+  void incNumHBProcessingFailed() {
+    numHBProcessingFailed.incr();
+  }
+
+  /**
+   * Increments number of node report processed count.
+   */
+  void incNumNodeReportProcessed() {
+    numNodeReportProcessed.incr();
+  }
+
+  /**
+   * Increments number of node report processing failed count.
+   */
+  void incNumNodeReportProcessingFailed() {
+    numNodeReportProcessingFailed.incr();
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index e8ecfa5..e360e7b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -31,7 +31,7 @@ import java.util.NavigableSet;
 /**
  * Interface which exposes the api for pipeline management.
  */
-public interface PipelineManager extends Closeable {
+public interface PipelineManager extends Closeable, PipelineManagerMXBean {
 
   Pipeline createPipeline(ReplicationType type, ReplicationFactor factor)
       throws IOException;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerMXBean.java
new file mode 100644
index 0000000..77a7a81
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerMXBean.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.util.Map;
+
+/**
+ * This is the JMX management interface for information related to
+ * PipelineManager.
+ */
+@InterfaceAudience.Private
+public interface PipelineManagerMXBean {
+
+  /**
+   * Returns the number of pipelines in different state.
+   * @return state to number of pipeline map
+   */
+  Map<String, Integer> getPipelineInfo();
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 193d98f..3716607 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.utils.MetadataKeyFilters;
 import org.apache.hadoop.utils.MetadataStore;
@@ -36,8 +37,10 @@ import org.apache.hadoop.utils.MetadataStoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.management.ObjectName;
 import java.io.File;
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
@@ -68,6 +71,9 @@ public class SCMPipelineManager implements PipelineManager {
 
   private final EventPublisher eventPublisher;
   private final NodeManager nodeManager;
+  private final SCMPipelineMetrics metrics;
+  // Pipeline Manager MXBean
+  private ObjectName pmInfoBean;
 
   public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
       EventPublisher eventPublisher) throws IOException {
@@ -87,6 +93,9 @@ public class SCMPipelineManager implements PipelineManager {
             .build();
     this.eventPublisher = eventPublisher;
     this.nodeManager = nodeManager;
+    this.metrics = SCMPipelineMetrics.create();
+    this.pmInfoBean = MBeans.register("SCMPipelineManager",
+        "SCMPipelineManagerInfo", this);
     initializePipelineState();
   }
 
@@ -115,12 +124,16 @@ public class SCMPipelineManager implements PipelineManager {
       ReplicationType type, ReplicationFactor factor) throws IOException {
     lock.writeLock().lock();
     try {
-      Pipeline pipeline =  pipelineFactory.create(type, factor);
+      Pipeline pipeline = pipelineFactory.create(type, factor);
       pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
           pipeline.getProtobufMessage().toByteArray());
       stateManager.addPipeline(pipeline);
       nodeManager.addPipeline(pipeline);
+      metrics.incNumPipelineCreated();
       return pipeline;
+    } catch (IOException ex) {
+      metrics.incNumPipelineCreationFailed();
+      throw ex;
     } finally {
       lock.writeLock().unlock();
     }
@@ -130,6 +143,7 @@ public class SCMPipelineManager implements PipelineManager {
   public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
                                  List<DatanodeDetails> nodes) {
     // This will mostly be used to create dummy pipeline for SimplePipelines.
+    // We don't update the metrics for SimplePipelines.
     lock.writeLock().lock();
     try {
       return pipelineFactory.create(type, factor, nodes);
@@ -260,12 +274,28 @@ public class SCMPipelineManager implements PipelineManager {
       pipelineStore.delete(pipelineID.getProtobuf().toByteArray());
       Pipeline pipeline = stateManager.removePipeline(pipelineID);
       nodeManager.removePipeline(pipeline);
+      metrics.incNumPipelineDestroyed();
+    } catch (IOException ex) {
+      metrics.incNumPipelineDestroyFailed();
+      throw ex;
     } finally {
       lock.writeLock().unlock();
     }
   }
 
   @Override
+  public Map<String, Integer> getPipelineInfo() {
+    final Map<String, Integer> pipelineInfo = new HashMap<>();
+    for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
+      pipelineInfo.put(state.toString(), 0);
+    }
+    stateManager.getPipelines().forEach(pipeline ->
+        pipelineInfo.computeIfPresent(
+            pipeline.getPipelineState().toString(), (k, v) -> v + 1));
+    return pipelineInfo;
+  }
+
+  @Override
   public void close() throws IOException {
     if (pipelineFactory != null) {
       pipelineFactory.close();
@@ -274,5 +304,12 @@ public class SCMPipelineManager implements PipelineManager {
     if (pipelineStore != null) {
       pipelineStore.close();
     }
+    if(pmInfoBean != null) {
+      MBeans.unregister(this.pmInfoBean);
+      pmInfoBean = null;
+    }
+    if(metrics != null) {
+      metrics.unRegister();
+    }
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
new file mode 100644
index 0000000..d1ae90e
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.metrics2.MetricsSystem;
+import org.apache.hadoop.metrics2.annotation.Metric;
+import org.apache.hadoop.metrics2.annotation.Metrics;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.lib.MutableCounterLong;
+
+/**
+ * This class maintains Pipeline related metrics.
+ */
+@InterfaceAudience.Private
+@Metrics(about = "SCM PipelineManager Metrics", context = "ozone")
+public final class SCMPipelineMetrics {
+
+  private static final String SOURCE_NAME =
+      SCMPipelineMetrics.class.getSimpleName();
+
+  private @Metric MutableCounterLong numPipelineCreated;
+  private @Metric MutableCounterLong numPipelineCreationFailed;
+  private @Metric MutableCounterLong numPipelineDestroyed;
+  private @Metric MutableCounterLong numPipelineDestroyFailed;
+  private @Metric MutableCounterLong numPipelineReportProcessed;
+  private @Metric MutableCounterLong numPipelineReportProcessingFailed;
+
+  /** Private constructor. */
+  private SCMPipelineMetrics() { }
+
+  /**
+   * Create and returns SCMPipelineMetrics instance.
+   *
+   * @return SCMPipelineMetrics
+   */
+  public static SCMPipelineMetrics create() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    return ms.register(SOURCE_NAME, "SCM PipelineManager Metrics",
+        new SCMPipelineMetrics());
+  }
+
+  /**
+   * Unregister the metrics instance.
+   */
+  public void unRegister() {
+    MetricsSystem ms = DefaultMetricsSystem.instance();
+    ms.unregisterSource(SOURCE_NAME);
+  }
+
+  /**
+   * Increments number of successful pipeline creation count.
+   */
+  void incNumPipelineCreated() {
+    numPipelineCreated.incr();
+  }
+
+  /**
+   * Increments number of failed pipeline creation count.
+   */
+  void incNumPipelineCreationFailed() {
+    numPipelineCreationFailed.incr();
+  }
+
+  /**
+   * Increments number of successful pipeline destroy count.
+   */
+  void incNumPipelineDestroyed() {
+    numPipelineDestroyed.incr();
+  }
+
+  /**
+   * Increments number of failed pipeline destroy count.
+   */
+  void incNumPipelineDestroyFailed() {
+    numPipelineDestroyFailed.incr();
+  }
+
+  /**
+   * Increments number of pipeline report processed count.
+   */
+  void incNumPipelineReportProcessed() {
+    numPipelineReportProcessed.incr();
+  }
+
+  /**
+   * Increments number of pipeline report processing failed count.
+   */
+  void incNumPipelineReportProcessingFailed() {
+    numPipelineReportProcessingFailed.incr();
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
new file mode 100644
index 0000000..c18ae5f
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.scm.node;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.StorageReportProto;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.node.SCMNodeMetrics;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.ozone.HddsDatanodeService;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
+/**
+ * Test cases to verify the metrics exposed by SCMNodeManager.
+ */
+public class TestSCMNodeMetrics {
+
+  private MiniOzoneCluster cluster;
+
+  @Before
+  public void setup() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    cluster = MiniOzoneCluster.newBuilder(conf).build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  /**
+   * Verifies heartbeat processing count.
+   *
+   * @throws InterruptedException
+   */
+  @Test
+  public void testHBProcessing() throws InterruptedException {
+    MetricsRecordBuilder metrics = getMetrics(
+        SCMNodeMetrics.class.getSimpleName());
+    long hbProcessed = getLongCounter("NumHBProcessed", metrics);
+    cluster.getHddsDatanodes().get(0)
+        .getDatanodeStateMachine().triggerHeartbeat();
+    // Give some time so that SCM receives and processes the heartbeat.
+    Thread.sleep(100L);
+    assertCounter("NumHBProcessed", hbProcessed + 1,
+        getMetrics(SCMNodeMetrics.class.getSimpleName()));
+  }
+
+  /**
+   * Verifies heartbeat processing failure count.
+   */
+  @Test
+  public void testHBProcessingFailure() {
+    MetricsRecordBuilder metrics = getMetrics(
+        SCMNodeMetrics.class.getSimpleName());
+    long hbProcessedFailed = getLongCounter("NumHBProcessingFailed", metrics);
+    cluster.getStorageContainerManager().getScmNodeManager()
+        .processHeartbeat(TestUtils.randomDatanodeDetails());
+    assertCounter("NumHBProcessingFailed", hbProcessedFailed + 1,
+        getMetrics(SCMNodeMetrics.class.getSimpleName()));
+  }
+
+  /**
+   * Verifies node report processing count.
+   *
+   * @throws InterruptedException
+   */
+  @Test
+  public void testNodeReportProcessing() throws InterruptedException {
+    MetricsRecordBuilder metrics = getMetrics(
+        SCMNodeMetrics.class.getSimpleName());
+    long nrProcessed = getLongCounter("NumNodeReportProcessed", metrics);
+    HddsDatanodeService datanode = cluster.getHddsDatanodes().get(0);
+    StorageReportProto storageReport = TestUtils.createStorageReport(
+        datanode.getDatanodeDetails().getUuid(), "/tmp", 100, 10, 90, null);
+    NodeReportProto nodeReport = NodeReportProto.newBuilder()
+        .addStorageReport(storageReport).build();
+    datanode.getDatanodeStateMachine().getContext().addReport(nodeReport);
+    datanode.getDatanodeStateMachine().triggerHeartbeat();
+    // Give some time so that SCM receives and processes the heartbeat.
+    Thread.sleep(100L);
+    assertCounter("NumNodeReportProcessed", nrProcessed + 1,
+        getMetrics(SCMNodeMetrics.class.getSimpleName()));
+  }
+
+  /**
+   * Verifies node report processing failure count.
+   */
+  @Test
+  public void testNodeReportProcessingFailure() {
+    MetricsRecordBuilder metrics = getMetrics(
+        SCMNodeMetrics.class.getSimpleName());
+    long nrProcessed = getLongCounter("NumNodeReportProcessingFailed",
+        metrics);
+    DatanodeDetails datanode = TestUtils.randomDatanodeDetails();
+    StorageReportProto storageReport = TestUtils.createStorageReport(
+        datanode.getUuid(), "/tmp", 100, 10, 90, null);
+    NodeReportProto nodeReport = NodeReportProto.newBuilder()
+        .addStorageReport(storageReport).build();
+
+    cluster.getStorageContainerManager().getScmNodeManager()
+        .processNodeReport(datanode, nodeReport);
+    assertCounter("NumNodeReportProcessingFailed", nrProcessed + 1,
+        getMetrics(SCMNodeMetrics.class.getSimpleName()));
+  }
+
+  @After
+  public void teardown() {
+    cluster.shutdown();
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/package-info.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/package-info.java
new file mode 100644
index 0000000..7ac6d18
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * <p>
+ * Utility classes to encode/decode DTO objects to/from byte array.
+ */
+
+/**
+ * Unit tests for Node related functions in SCM.
+ */
+package org.apache.hadoop.ozone.scm.node;
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestPipelineManagerMXBean.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestPipelineManagerMXBean.java
new file mode 100644
index 0000000..cdc9f0f
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestPipelineManagerMXBean.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import javax.management.MBeanServer;
+import javax.management.ObjectName;
+import javax.management.openmbean.CompositeData;
+import javax.management.openmbean.TabularData;
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.concurrent.TimeoutException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Test cases to verify the metrics exposed by SCMPipelineManager via MXBean.
+ */
+public class TestPipelineManagerMXBean {
+
+  private MiniOzoneCluster cluster;
+  private static MBeanServer mbs;
+
+  @Before
+  public void init()
+      throws IOException, TimeoutException, InterruptedException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    cluster = MiniOzoneCluster.newBuilder(conf).build();
+    cluster.waitForClusterToBeReady();
+    mbs = ManagementFactory.getPlatformMBeanServer();
+  }
+
+  /**
+   * Verifies SCMPipelineManagerInfo metrics.
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testPipelineInfo() throws Exception {
+    ObjectName bean = new ObjectName(
+        "Hadoop:service=SCMPipelineManager,name=SCMPipelineManagerInfo");
+
+    TabularData data = (TabularData) mbs.getAttribute(bean, "PipelineInfo");
+    Map<String, Integer> datanodeInfo = cluster.getStorageContainerManager()
+        .getPipelineManager().getPipelineInfo();
+    verifyEquals(data, datanodeInfo);
+  }
+
+  private void verifyEquals(TabularData actualData, Map<String, Integer>
+      expectedData) {
+    if (actualData == null || expectedData == null) {
+      fail("Data should not be null.");
+    }
+    for (Object obj : actualData.values()) {
+      assertTrue(obj instanceof CompositeData);
+      CompositeData cds = (CompositeData) obj;
+      assertEquals(2, cds.values().size());
+      Iterator<?> it = cds.values().iterator();
+      String key = it.next().toString();
+      String value = it.next().toString();
+      long num = Long.parseLong(value);
+      assertTrue(expectedData.containsKey(key));
+      assertEquals(expectedData.remove(key).longValue(), num);
+    }
+    assertTrue(expectedData.isEmpty());
+  }
+
+  @After
+  public void teardown() {
+    cluster.shutdown();
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java
new file mode 100644
index 0000000..1583952
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineMetrics;
+import org.apache.hadoop.metrics2.MetricsRecordBuilder;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
+import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
+
+/**
+ * Test cases to verify the metrics exposed by SCMPipelineManager.
+ */
+public class TestSCMPipelineMetrics {
+
+  private MiniOzoneCluster cluster;
+
+  @Before
+  public void setup() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(3)
+        .build();
+    cluster.waitForClusterToBeReady();
+  }
+
+  /**
+   * Verifies pipeline creation metric.
+   */
+  @Test
+  public void testPipelineCreation() {
+    MetricsRecordBuilder metrics = getMetrics(
+        SCMPipelineMetrics.class.getSimpleName());
+    long numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
+    // Pipelines are created in background when the cluster starts.
+    Assert.assertTrue(numPipelineCreated > 0);
+  }
+
+  /**
+   * Verifies pipeline destroy metric.
+   */
+  @Test
+  public void testPipelineDestroy() {
+    PipelineManager pipelineManager = cluster
+        .getStorageContainerManager().getPipelineManager();
+    Optional<Pipeline> pipeline = pipelineManager
+        .getPipelines().stream().findFirst();
+    Assert.assertTrue(pipeline.isPresent());
+    pipeline.ifPresent(pipeline1 -> {
+      try {
+        cluster.getStorageContainerManager()
+            .getClientProtocolServer().closePipeline(
+                pipeline.get().getId().getProtobuf());
+      } catch (IOException e) {
+        e.printStackTrace();
+        Assert.fail();
+      }
+    });
+    MetricsRecordBuilder metrics = getMetrics(
+        SCMPipelineMetrics.class.getSimpleName());
+    assertCounter("NumPipelineDestroyed", 1L, metrics);
+  }
+
+
+  @After
+  public void teardown() {
+    cluster.shutdown();
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/package-info.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/package-info.java
new file mode 100644
index 0000000..ea6734a
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * <p>
+ * Utility classes to encode/decode DTO objects to/from byte array.
+ */
+
+/**
+ * Unit tests for Pipeline related functions in SCM.
+ */
+package org.apache.hadoop.ozone.scm.pipeline;
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org