You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2019/02/08 15:51:08 UTC

[hadoop] branch trunk updated: HDDS-1048. Remove SCMNodeStat from SCMNodeManager and use storage information from DatanodeInfo#StorageReportProto. Contributed by Nanda kumar.

This is an automated email from the ASF dual-hosted git repository.

yqlin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fb8c997  HDDS-1048. Remove SCMNodeStat from SCMNodeManager and use storage information from DatanodeInfo#StorageReportProto. Contributed by Nanda kumar.
fb8c997 is described below

commit fb8c997a6884bbe19c45bab77950068bb78109c7
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Fri Feb 8 23:49:37 2019 +0800

    HDDS-1048. Remove SCMNodeStat from SCMNodeManager and use storage information from DatanodeInfo#StorageReportProto. Contributed by Nanda kumar.
---
 .../apache/hadoop/hdds/scm/node/DatanodeInfo.java  |   7 +-
 .../hadoop/hdds/scm/node/DeadNodeHandler.java      |   1 -
 .../apache/hadoop/hdds/scm/node/NodeManager.java   |  15 +-
 .../hadoop/hdds/scm/node/NodeStateManager.java     |  65 ++------
 .../hadoop/hdds/scm/node/SCMNodeManager.java       | 169 ++++++++++-----------
 .../hadoop/hdds/scm/node/states/NodeStateMap.java  |  58 -------
 .../hadoop/hdds/scm/container/MockNodeManager.java |  31 +---
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |  28 ++--
 .../hdds/scm/node/TestNodeReportHandler.java       |   3 +-
 .../testutils/ReplicationNodeManagerMock.java      |  11 +-
 10 files changed, 123 insertions(+), 265 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
index 26b8b95..d06ea2a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.util.Time;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -38,7 +39,6 @@ public class DatanodeInfo extends DatanodeDetails {
   private volatile long lastHeartbeatTime;
   private long lastStatsUpdatedTime;
 
-  // If required we can dissect StorageReportProto and store the raw data
   private List<StorageReportProto> storageReports;
 
   /**
@@ -48,8 +48,9 @@ public class DatanodeInfo extends DatanodeDetails {
    */
   public DatanodeInfo(DatanodeDetails datanodeDetails) {
     super(datanodeDetails);
-    lock = new ReentrantReadWriteLock();
-    lastHeartbeatTime = Time.monotonicNow();
+    this.lock = new ReentrantReadWriteLock();
+    this.lastHeartbeatTime = Time.monotonicNow();
+    this.storageReports = Collections.emptyList();
   }
 
   /**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index 8e71399..a75a51a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -58,7 +58,6 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
       EventPublisher publisher) {
-    nodeManager.processDeadNode(datanodeDetails.getUuid());
 
     // TODO: check if there are any pipeline on this node and fire close
     // pipeline event
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index d8865a8..6b8d477 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -93,8 +93,7 @@ public interface NodeManager extends StorageContainerNodeProtocol,
    * Return a map of node stats.
    * @return a map of individual node stats (live/stale but not dead).
    */
-  // TODO: try to change the return type to Map<DatanodeDetails, SCMNodeStat>
-  Map<UUID, SCMNodeStat> getNodeStats();
+  Map<DatanodeDetails, SCMNodeStat> getNodeStats();
 
   /**
    * Return the node stat of the specified datanode.
@@ -159,17 +158,11 @@ public interface NodeManager extends StorageContainerNodeProtocol,
   /**
    * Process node report.
    *
-   * @param dnUuid
+   * @param datanodeDetails
    * @param nodeReport
    */
-  void processNodeReport(DatanodeDetails dnUuid, NodeReportProto nodeReport);
-
-  /**
-   * Process a dead node event in this Node Manager.
-   *
-   * @param dnUuid datanode uuid.
-   */
-  void processDeadNode(UUID dnUuid);
+  void processNodeReport(DatanodeDetails datanodeDetails,
+                         NodeReportProto nodeReport);
 
   /**
    * Get list of SCMCommands in the Command Queue for a particular Datanode.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index 5ff9dfa..c54944b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.states.*;
 import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
@@ -276,20 +275,6 @@ public class NodeStateManager implements Runnable, Closeable {
   }
 
   /**
-   * Get information about the node.
-   *
-   * @param datanodeUUID datanode UUID
-   *
-   * @return DatanodeInfo
-   *
-   * @throws NodeNotFoundException if the node is not present
-   */
-  public DatanodeInfo getNode(UUID datanodeUUID)
-      throws NodeNotFoundException {
-    return nodeStateMap.getNodeInfo(datanodeUUID);
-  }
-
-  /**
    * Updates the last heartbeat time of the node.
    *
    * @throws NodeNotFoundException if the node is not present
@@ -319,7 +304,7 @@ public class NodeStateManager implements Runnable, Closeable {
    *
    * @return list of healthy nodes
    */
-  public List<DatanodeDetails> getHealthyNodes() {
+  public List<DatanodeInfo> getHealthyNodes() {
     return getNodes(NodeState.HEALTHY);
   }
 
@@ -328,7 +313,7 @@ public class NodeStateManager implements Runnable, Closeable {
    *
    * @return list of stale nodes
    */
-  public List<DatanodeDetails> getStaleNodes() {
+  public List<DatanodeInfo> getStaleNodes() {
     return getNodes(NodeState.STALE);
   }
 
@@ -337,7 +322,7 @@ public class NodeStateManager implements Runnable, Closeable {
    *
    * @return list of dead nodes
    */
-  public List<DatanodeDetails> getDeadNodes() {
+  public List<DatanodeInfo> getDeadNodes() {
     return getNodes(NodeState.DEAD);
   }
 
@@ -348,12 +333,12 @@ public class NodeStateManager implements Runnable, Closeable {
    *
    * @return list of nodes
    */
-  public List<DatanodeDetails> getNodes(NodeState state) {
-    List<DatanodeDetails> nodes = new ArrayList<>();
+  public List<DatanodeInfo> getNodes(NodeState state) {
+    List<DatanodeInfo> nodes = new ArrayList<>();
     nodeStateMap.getNodes(state).forEach(
         uuid -> {
           try {
-            nodes.add(nodeStateMap.getNodeDetails(uuid));
+            nodes.add(nodeStateMap.getNodeInfo(uuid));
           } catch (NodeNotFoundException e) {
             // This should not happen unless someone else other than
             // NodeStateManager is directly modifying NodeStateMap and removed
@@ -369,12 +354,12 @@ public class NodeStateManager implements Runnable, Closeable {
    *
    * @return all the managed nodes
    */
-  public List<DatanodeDetails> getAllNodes() {
-    List<DatanodeDetails> nodes = new ArrayList<>();
+  public List<DatanodeInfo> getAllNodes() {
+    List<DatanodeInfo> nodes = new ArrayList<>();
     nodeStateMap.getAllNodes().forEach(
         uuid -> {
           try {
-            nodes.add(nodeStateMap.getNodeDetails(uuid));
+            nodes.add(nodeStateMap.getNodeInfo(uuid));
           } catch (NodeNotFoundException e) {
             // This should not happen unless someone else other than
             // NodeStateManager is directly modifying NodeStateMap and removed
@@ -442,38 +427,6 @@ public class NodeStateManager implements Runnable, Closeable {
   }
 
   /**
-   * Returns the current stats of the node.
-   *
-   * @param uuid node id
-   *
-   * @return SCMNodeStat
-   *
-   * @throws NodeNotFoundException if the node is not present
-   */
-  public SCMNodeStat getNodeStat(UUID uuid) throws NodeNotFoundException {
-    return nodeStateMap.getNodeStat(uuid);
-  }
-
-  /**
-   * Returns a unmodifiable copy of nodeStats.
-   * @return map with node stats.
-   */
-  public Map<UUID, SCMNodeStat> getNodeStatsMap() {
-    return nodeStateMap.getNodeStats();
-  }
-
-  /**
-   * Set the stat for the node.
-   *
-   * @param uuid node id.
-   *
-   * @param newstat new stat that will set to the specify node.
-   */
-  public void setNodeStat(UUID uuid, SCMNodeStat newstat) {
-    nodeStateMap.setNodeStat(uuid, newstat);
-  }
-
-  /**
    * Removes a pipeline from the node2PipelineMap.
    * @param pipeline - Pipeline to be removed
    */
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 61ddb5b..3c5eaf8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -19,7 +19,8 @@ package org.apache.hadoop.hdds.scm.node;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -65,6 +66,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
 
 /**
  * Maintains information about the Datanodes on SCM side.
@@ -80,18 +82,13 @@ import java.util.UUID;
  * get functions in this file as a snap-shot of information that is inconsistent
  * as soon as you read it.
  */
-public class SCMNodeManager
-    implements NodeManager, StorageContainerNodeProtocol {
+public class SCMNodeManager implements NodeManager {
 
   @VisibleForTesting
   static final Logger LOG =
       LoggerFactory.getLogger(SCMNodeManager.class);
 
   private final NodeStateManager nodeStateManager;
-  // Should we maintain aggregated stats? If this is not frequently used, we
-  // can always calculate it from nodeStats whenever required.
-  // Aggregated node stats
-  private SCMNodeStat scmStat;
   private final String clusterID;
   private final VersionInfo version;
   private final CommandQueue commandQueue;
@@ -108,7 +105,6 @@ public class SCMNodeManager
       StorageContainerManager scmManager, EventPublisher eventPublisher)
       throws IOException {
     this.nodeStateManager = new NodeStateManager(conf, eventPublisher);
-    this.scmStat = new SCMNodeStat();
     this.clusterID = clusterID;
     this.version = VersionInfo.getLatestVersion();
     this.commandQueue = new CommandQueue();
@@ -131,7 +127,7 @@ public class SCMNodeManager
 
 
   /**
-   * Gets all datanodes that are in a certain state. This function works by
+   * Returns all datanode that are in the given state. This function works by
    * taking a snapshot of the current collection and then returning the list
    * from that collection. This means that real map might have changed by the
    * time we return this list.
@@ -140,7 +136,8 @@ public class SCMNodeManager
    */
   @Override
   public List<DatanodeDetails> getNodes(NodeState nodestate) {
-    return nodeStateManager.getNodes(nodestate);
+    return nodeStateManager.getNodes(nodestate).stream()
+        .map(node -> (DatanodeDetails)node).collect(Collectors.toList());
   }
 
   /**
@@ -150,13 +147,14 @@ public class SCMNodeManager
    */
   @Override
   public List<DatanodeDetails> getAllNodes() {
-    return nodeStateManager.getAllNodes();
+    return nodeStateManager.getAllNodes().stream()
+        .map(node -> (DatanodeDetails)node).collect(Collectors.toList());
   }
 
   /**
    * Returns the Number of Datanodes by State they are in.
    *
-   * @return int -- count
+   * @return count
    */
   @Override
   public int getNodeCount(NodeState nodestate) {
@@ -166,7 +164,7 @@ public class SCMNodeManager
   /**
    * Returns the node state of a specific node.
    *
-   * @param datanodeDetails - Datanode Details
+   * @param datanodeDetails Datanode Details
    * @return Healthy/Stale/Dead/Unknown.
    */
   @Override
@@ -179,47 +177,6 @@ public class SCMNodeManager
     }
   }
 
-
-  private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) {
-    SCMNodeStat stat;
-    try {
-      stat = nodeStateManager.getNodeStat(dnId);
-
-      // Updating the storage report for the datanode.
-      // I dont think we will get NotFound exception, as we are taking
-      // nodeInfo from nodeStateMap, as I see it is not being removed from
-      // the map, just we change the states. And during first time
-      // registration we call this, after adding to nodeStateMap. And also
-      // from eventhandler it is called only if it has node Report.
-      DatanodeInfo datanodeInfo = nodeStateManager.getNode(dnId);
-      if (nodeReport != null) {
-        datanodeInfo.updateStorageReports(nodeReport.getStorageReportList());
-      }
-
-    } catch (NodeNotFoundException e) {
-      LOG.debug("SCM updateNodeStat based on heartbeat from previous " +
-          "dead datanode {}", dnId);
-      stat = new SCMNodeStat();
-    }
-
-    if (nodeReport != null && nodeReport.getStorageReportCount() > 0) {
-      long totalCapacity = 0;
-      long totalRemaining = 0;
-      long totalScmUsed = 0;
-      List<StorageReportProto> storageReports = nodeReport
-          .getStorageReportList();
-      for (StorageReportProto report : storageReports) {
-        totalCapacity += report.getCapacity();
-        totalRemaining +=  report.getRemaining();
-        totalScmUsed+= report.getScmUsed();
-      }
-      scmStat.subtract(stat);
-      stat.set(totalCapacity, totalScmUsed, totalRemaining);
-      scmStat.add(stat);
-    }
-    nodeStateManager.setNodeStat(dnId, stat);
-  }
-
   /**
    * Closes this stream and releases any system resources associated with it. If
    * the stream is already closed then invoking this method has no effect.
@@ -275,7 +232,7 @@ public class SCMNodeManager
     try {
       nodeStateManager.addNode(datanodeDetails);
       // Updating Node Report, as registration is successful
-      updateNodeStat(datanodeDetails.getUuid(), nodeReport);
+      processNodeReport(datanodeDetails, nodeReport);
       LOG.info("Registered Data node : {}", datanodeDetails);
     } catch (NodeAlreadyExistsException e) {
       LOG.trace("Datanode is already registered. Datanode: {}",
@@ -321,13 +278,21 @@ public class SCMNodeManager
   /**
    * Process node report.
    *
-   * @param dnUuid
+   * @param datanodeDetails
    * @param nodeReport
    */
   @Override
-  public void processNodeReport(DatanodeDetails dnUuid,
+  public void processNodeReport(DatanodeDetails datanodeDetails,
                                 NodeReportProto nodeReport) {
-    this.updateNodeStat(dnUuid.getUuid(), nodeReport);
+    try {
+      DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails);
+      if (nodeReport != null) {
+        datanodeInfo.updateStorageReports(nodeReport.getStorageReportList());
+      }
+    } catch (NodeNotFoundException e) {
+      LOG.warn("Got node report from unregistered datanode {}",
+          datanodeDetails);
+    }
   }
 
   /**
@@ -336,7 +301,16 @@ public class SCMNodeManager
    */
   @Override
   public SCMNodeStat getStats() {
-    return new SCMNodeStat(this.scmStat);
+    long capacity = 0L;
+    long used = 0L;
+    long remaining = 0L;
+
+    for (SCMNodeStat stat : getNodeStats().values()) {
+      capacity += stat.getCapacity().get();
+      used += stat.getScmUsed().get();
+      remaining += stat.getRemaining().get();
+    }
+    return new SCMNodeStat(capacity, used, remaining);
   }
 
   /**
@@ -344,8 +318,24 @@ public class SCMNodeManager
    * @return a map of individual node stats (live/stale but not dead).
    */
   @Override
-  public Map<UUID, SCMNodeStat> getNodeStats() {
-    return nodeStateManager.getNodeStatsMap();
+  public Map<DatanodeDetails, SCMNodeStat> getNodeStats() {
+
+    final Map<DatanodeDetails, SCMNodeStat> nodeStats = new HashMap<>();
+
+    final List<DatanodeInfo> healthyNodes =  nodeStateManager
+        .getNodes(NodeState.HEALTHY);
+    final List<DatanodeInfo> staleNodes = nodeStateManager
+        .getNodes(NodeState.STALE);
+    final List<DatanodeInfo> datanodes = new ArrayList<>(healthyNodes);
+    datanodes.addAll(staleNodes);
+
+    for (DatanodeInfo dnInfo : datanodes) {
+      SCMNodeStat nodeStat = getNodeStatInternal(dnInfo);
+      if (nodeStat != null) {
+        nodeStats.put(dnInfo, nodeStat);
+      }
+    }
+    return nodeStats;
   }
 
   /**
@@ -356,11 +346,28 @@ public class SCMNodeManager
    */
   @Override
   public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
+    final SCMNodeStat nodeStat = getNodeStatInternal(datanodeDetails);
+    return nodeStat != null ? new SCMNodeMetric(nodeStat) : null;
+  }
+
+  private SCMNodeStat getNodeStatInternal(DatanodeDetails datanodeDetails) {
     try {
-      return new SCMNodeMetric(
-          nodeStateManager.getNodeStat(datanodeDetails.getUuid()));
+      long capacity = 0L;
+      long used = 0L;
+      long remaining = 0L;
+
+      final DatanodeInfo datanodeInfo = nodeStateManager
+          .getNode(datanodeDetails);
+      final List<StorageReportProto> storageReportProtos = datanodeInfo
+          .getStorageReports();
+      for (StorageReportProto reportProto : storageReportProtos) {
+        capacity += reportProto.getCapacity();
+        used += reportProto.getScmUsed();
+        remaining += reportProto.getRemaining();
+      }
+      return new SCMNodeStat(capacity, used, remaining);
     } catch (NodeNotFoundException e) {
-      LOG.info("SCM getNodeStat from a decommissioned or removed datanode {}",
+      LOG.warn("Cannot generate NodeStat, datanode {} not found.",
           datanodeDetails.getUuid());
       return null;
     }
@@ -375,6 +382,8 @@ public class SCMNodeManager
     return nodeCountMap;
   }
 
+  // We should introduce DISK, SSD, etc., notion in
+  // SCMNodeStat and try to use it.
   @Override
   public Map<String, Long> getNodeInfo() {
     long diskCapacity = 0L;
@@ -385,14 +394,15 @@ public class SCMNodeManager
     long ssdUsed = 0L;
     long ssdRemaining = 0L;
 
-    List<DatanodeDetails> healthyNodes =  getNodes(NodeState.HEALTHY);
-    List<DatanodeDetails> staleNodes = getNodes(NodeState.STALE);
+    List<DatanodeInfo> healthyNodes =  nodeStateManager
+        .getNodes(NodeState.HEALTHY);
+    List<DatanodeInfo> staleNodes = nodeStateManager
+        .getNodes(NodeState.STALE);
 
-    List<DatanodeDetails> datanodes = new ArrayList<>(healthyNodes);
+    List<DatanodeInfo> datanodes = new ArrayList<>(healthyNodes);
     datanodes.addAll(staleNodes);
 
-    for (DatanodeDetails datanodeDetails : datanodes) {
-      DatanodeInfo dnInfo = (DatanodeInfo) datanodeDetails;
+    for (DatanodeInfo dnInfo : datanodes) {
       List<StorageReportProto> storageReportProtos = dnInfo.getStorageReports();
       for (StorageReportProto reportProto : storageReportProtos) {
         if (reportProto.getStorageType() ==
@@ -498,27 +508,6 @@ public class SCMNodeManager
         commandForDatanode.getCommand());
   }
 
-  /**
-   * Update the node stats and cluster storage stats in this SCM Node Manager.
-   *
-   * @param dnUuid datanode uuid.
-   */
-  @Override
-  // TODO: This should be removed.
-  public void processDeadNode(UUID dnUuid) {
-    try {
-      SCMNodeStat stat = nodeStateManager.getNodeStat(dnUuid);
-      if (stat != null) {
-        LOG.trace("Update stat values as Datanode {} is dead.", dnUuid);
-        scmStat.subtract(stat);
-        stat.set(0, 0, 0);
-      }
-    } catch (NodeNotFoundException e) {
-      LOG.warn("Can't update stats based on message of dead Datanode {}, it"
-          + " doesn't exist or decommissioned already.", dnUuid);
-    }
-  }
-
   @Override
   public List<SCMCommand> getCommandQueue(UUID dnID) {
     return commandQueue.getCommand(dnID);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
index a68e2b5..fd87cca 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdds.scm.node.states;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
 import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
 
 import java.util.*;
@@ -46,10 +45,6 @@ public class NodeStateMap {
    */
   private final ConcurrentHashMap<NodeState, Set<UUID>> stateMap;
   /**
-   * Represents the current stats of node.
-   */
-  private final ConcurrentHashMap<UUID, SCMNodeStat> nodeStats;
-  /**
    * Node to set of containers on the node.
    */
   private final ConcurrentHashMap<UUID, Set<ContainerID>> nodeToContainer;
@@ -63,7 +58,6 @@ public class NodeStateMap {
     lock = new ReentrantReadWriteLock();
     nodeMap = new ConcurrentHashMap<>();
     stateMap = new ConcurrentHashMap<>();
-    nodeStats = new ConcurrentHashMap<>();
     nodeToContainer = new ConcurrentHashMap<>();
     initStateMap();
   }
@@ -94,7 +88,6 @@ public class NodeStateMap {
         throw new NodeAlreadyExistsException("Node UUID: " + id);
       }
       nodeMap.put(id, new DatanodeInfo(datanodeDetails));
-      nodeStats.put(id, new SCMNodeStat());
       nodeToContainer.put(id, Collections.emptySet());
       stateMap.get(nodeState).add(id);
     } finally {
@@ -127,20 +120,6 @@ public class NodeStateMap {
   }
 
   /**
-   * Returns DatanodeDetails for the given node id.
-   *
-   * @param uuid Node Id
-   *
-   * @return DatanodeDetails of the node
-   *
-   * @throws NodeNotFoundException if the node is not present
-   */
-  public DatanodeDetails getNodeDetails(UUID uuid)
-      throws NodeNotFoundException {
-    return getNodeInfo(uuid);
-  }
-
-  /**
    * Returns DatanodeInfo for the given node id.
    *
    * @param uuid Node Id
@@ -245,43 +224,6 @@ public class NodeStateMap {
     }
   }
 
-  /**
-   * Returns the current stats of the node.
-   *
-   * @param uuid node id
-   *
-   * @return SCMNodeStat of the specify node.
-   *
-   * @throws NodeNotFoundException if the node is not found
-   */
-  public SCMNodeStat getNodeStat(UUID uuid) throws NodeNotFoundException {
-    SCMNodeStat stat = nodeStats.get(uuid);
-    if (stat == null) {
-      throw new NodeNotFoundException("Node UUID: " + uuid);
-    }
-    return stat;
-  }
-
-  /**
-   * Returns a unmodifiable copy of nodeStats.
-   *
-   * @return map with node stats.
-   */
-  public Map<UUID, SCMNodeStat> getNodeStats() {
-    return Collections.unmodifiableMap(nodeStats);
-  }
-
-  /**
-   * Set the current stats of the node.
-   *
-   * @param uuid node id
-   *
-   * @param newstat stat that will set to the specify node.
-   */
-  public void setNodeStat(UUID uuid, SCMNodeStat newstat) {
-    nodeStats.put(uuid, newstat);
-  }
-
   public void setContainers(UUID uuid, Set<ContainerID> containers)
       throws NodeNotFoundException{
     if (!nodeToContainer.containsKey(uuid)) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 33fa1fa..ffdea0e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -76,7 +76,7 @@ public class MockNodeManager implements NodeManager {
   private final List<DatanodeDetails> healthyNodes;
   private final List<DatanodeDetails> staleNodes;
   private final List<DatanodeDetails> deadNodes;
-  private final Map<UUID, SCMNodeStat> nodeMetricMap;
+  private final Map<DatanodeDetails, SCMNodeStat> nodeMetricMap;
   private final SCMNodeStat aggregateStat;
   private boolean chillmode;
   private final Map<UUID, List<SCMCommand>> commandMap;
@@ -114,7 +114,7 @@ public class MockNodeManager implements NodeManager {
     newStat.set(
         (NODES[x % NODES.length].capacity),
         (NODES[x % NODES.length].used), remaining);
-    this.nodeMetricMap.put(datanodeDetails.getUuid(), newStat);
+    this.nodeMetricMap.put(datanodeDetails, newStat);
     aggregateStat.add(newStat);
 
     if (NODES[x % NODES.length].getCurrentState() == NodeData.HEALTHY) {
@@ -201,7 +201,7 @@ public class MockNodeManager implements NodeManager {
    * @return a list of individual node stats (live/stale but not dead).
    */
   @Override
-  public Map<UUID, SCMNodeStat> getNodeStats() {
+  public Map<DatanodeDetails, SCMNodeStat> getNodeStats() {
     return nodeMetricMap;
   }
 
@@ -213,7 +213,7 @@ public class MockNodeManager implements NodeManager {
    */
   @Override
   public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
-    SCMNodeStat stat = nodeMetricMap.get(datanodeDetails.getUuid());
+    SCMNodeStat stat = nodeMetricMap.get(datanodeDetails);
     if (stat == null) {
       return null;
     }
@@ -413,12 +413,12 @@ public class MockNodeManager implements NodeManager {
    * @param size number of bytes.
    */
   public void addContainer(DatanodeDetails datanodeDetails, long size) {
-    SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid());
+    SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails);
     if (stat != null) {
       aggregateStat.subtract(stat);
       stat.getCapacity().add(size);
       aggregateStat.add(stat);
-      nodeMetricMap.put(datanodeDetails.getUuid(), stat);
+      nodeMetricMap.put(datanodeDetails, stat);
     }
   }
 
@@ -429,12 +429,12 @@ public class MockNodeManager implements NodeManager {
    * @param size number of bytes.
    */
   public void delContainer(DatanodeDetails datanodeDetails, long size) {
-    SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid());
+    SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails);
     if (stat != null) {
       aggregateStat.subtract(stat);
       stat.getCapacity().subtract(size);
       aggregateStat.add(stat);
-      nodeMetricMap.put(datanodeDetails.getUuid(), stat);
+      nodeMetricMap.put(datanodeDetails, stat);
     }
   }
 
@@ -445,21 +445,6 @@ public class MockNodeManager implements NodeManager {
         commandForDatanode.getCommand());
   }
 
-  /**
-   * Remove the node stats and update the storage stats
-   * in this Node Manager.
-   *
-   * @param dnUuid UUID of the datanode.
-   */
-  @Override
-  public void processDeadNode(UUID dnUuid) {
-    SCMNodeStat stat = this.nodeMetricMap.get(dnUuid);
-    if (stat != null) {
-      aggregateStat.subtract(stat);
-      stat.set(0, 0, 0);
-    }
-  }
-
   @Override
   public List<SCMCommand> getCommandQueue(UUID dnID) {
     return null;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 8dbf389..37b28eb 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.StorageReportProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
@@ -81,6 +82,10 @@ public class TestDeadNodeHandler {
     storageDir = GenericTestUtils.getTempPath(
         TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
+    conf.set(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, "100ms");
+    conf.set(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, "50ms");
+    conf.set(ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL, "1s");
+    conf.set(ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL, "2s");
     eventQueue = new EventQueue();
     scm = HddsTestUtils.getScm(conf);
     nodeManager = (SCMNodeManager) scm.getScmNodeManager();
@@ -237,20 +242,21 @@ public class TestDeadNodeHandler {
     Assert.assertTrue(nodeStat.get().getRemaining().get() == 90);
     Assert.assertTrue(nodeStat.get().getScmUsed().get() == 10);
 
-    //WHEN datanode1 is dead.
-    eventQueue.fireEvent(SCMEvents.DEAD_NODE, datanode1);
-    Thread.sleep(100);
+    //TODO: Support logic to mark a node as dead in NodeManager.
 
+    nodeManager.processHeartbeat(datanode2);
+    Thread.sleep(1000);
+    nodeManager.processHeartbeat(datanode2);
+    Thread.sleep(1000);
+    nodeManager.processHeartbeat(datanode2);
+    Thread.sleep(1000);
+    nodeManager.processHeartbeat(datanode2);
     //THEN statistics in SCM should changed.
     stat = nodeManager.getStats();
-    Assert.assertTrue(stat.getCapacity().get() == 200);
-    Assert.assertTrue(stat.getRemaining().get() == 180);
-    Assert.assertTrue(stat.getScmUsed().get() == 20);
-
-    nodeStat = nodeManager.getNodeStat(datanode1);
-    Assert.assertTrue(nodeStat.get().getCapacity().get() == 0);
-    Assert.assertTrue(nodeStat.get().getRemaining().get() == 0);
-    Assert.assertTrue(nodeStat.get().getScmUsed().get() == 0);
+    Assert.assertEquals(200L, stat.getCapacity().get().longValue());
+    Assert.assertEquals(180L,
+        stat.getRemaining().get().longValue());
+    Assert.assertEquals(20L, stat.getScmUsed().get().longValue());
   }
 
   @Test
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java
index fa163eb..1cb9bcd 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java
@@ -63,8 +63,7 @@ public class TestNodeReportHandler implements EventPublisher {
     SCMNodeMetric nodeMetric = nodeManager.getNodeStat(dn);
     Assert.assertNull(nodeMetric);
 
-    nodeReportHandler.onMessage(
-        getNodeReport(dn, storageOne), this);
+    nodeManager.register(dn, getNodeReport(dn, storageOne).getReport(), null);
     nodeMetric = nodeManager.getNodeStat(dn);
 
     Assert.assertTrue(nodeMetric.get().getCapacity().get() == 100);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 3f31708..35cc1aa9 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -126,7 +126,7 @@ public class ReplicationNodeManagerMock implements NodeManager {
    * @return a map of individual node stats (live/stale but not dead).
    */
   @Override
-  public Map<UUID, SCMNodeStat> getNodeStats() {
+  public Map<DatanodeDetails, SCMNodeStat> getNodeStats() {
     return null;
   }
 
@@ -305,15 +305,6 @@ public class ReplicationNodeManagerMock implements NodeManager {
     // do nothing.
   }
 
-  /**
-   * Empty implementation for processDeadNode.
-   * @param dnUuid
-   */
-  @Override
-  public void processDeadNode(UUID dnUuid) {
-    // do nothing.
-  }
-
   @Override
   public List<SCMCommand> getCommandQueue(UUID dnID) {
     return null;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org