You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yq...@apache.org on 2019/02/08 15:51:08 UTC
[hadoop] branch trunk updated: HDDS-1048. Remove SCMNodeStat from
SCMNodeManager and use storage information from
DatanodeInfo#StorageReportProto. Contributed by Nanda kumar.
This is an automated email from the ASF dual-hosted git repository.
yqlin pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new fb8c997 HDDS-1048. Remove SCMNodeStat from SCMNodeManager and use storage information from DatanodeInfo#StorageReportProto. Contributed by Nanda kumar.
fb8c997 is described below
commit fb8c997a6884bbe19c45bab77950068bb78109c7
Author: Yiqun Lin <yq...@apache.org>
AuthorDate: Fri Feb 8 23:49:37 2019 +0800
HDDS-1048. Remove SCMNodeStat from SCMNodeManager and use storage information from DatanodeInfo#StorageReportProto. Contributed by Nanda kumar.
---
.../apache/hadoop/hdds/scm/node/DatanodeInfo.java | 7 +-
.../hadoop/hdds/scm/node/DeadNodeHandler.java | 1 -
.../apache/hadoop/hdds/scm/node/NodeManager.java | 15 +-
.../hadoop/hdds/scm/node/NodeStateManager.java | 65 ++------
.../hadoop/hdds/scm/node/SCMNodeManager.java | 169 ++++++++++-----------
.../hadoop/hdds/scm/node/states/NodeStateMap.java | 58 -------
.../hadoop/hdds/scm/container/MockNodeManager.java | 31 +---
.../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 28 ++--
.../hdds/scm/node/TestNodeReportHandler.java | 3 +-
.../testutils/ReplicationNodeManagerMock.java | 11 +-
10 files changed, 123 insertions(+), 265 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
index 26b8b95..d06ea2a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DatanodeInfo.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.util.Time;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -38,7 +39,6 @@ public class DatanodeInfo extends DatanodeDetails {
private volatile long lastHeartbeatTime;
private long lastStatsUpdatedTime;
- // If required we can dissect StorageReportProto and store the raw data
private List<StorageReportProto> storageReports;
/**
@@ -48,8 +48,9 @@ public class DatanodeInfo extends DatanodeDetails {
*/
public DatanodeInfo(DatanodeDetails datanodeDetails) {
super(datanodeDetails);
- lock = new ReentrantReadWriteLock();
- lastHeartbeatTime = Time.monotonicNow();
+ this.lock = new ReentrantReadWriteLock();
+ this.lastHeartbeatTime = Time.monotonicNow();
+ this.storageReports = Collections.emptyList();
}
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index 8e71399..a75a51a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -58,7 +58,6 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
@Override
public void onMessage(DatanodeDetails datanodeDetails,
EventPublisher publisher) {
- nodeManager.processDeadNode(datanodeDetails.getUuid());
// TODO: check if there are any pipeline on this node and fire close
// pipeline event
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index d8865a8..6b8d477 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -93,8 +93,7 @@ public interface NodeManager extends StorageContainerNodeProtocol,
* Return a map of node stats.
* @return a map of individual node stats (live/stale but not dead).
*/
- // TODO: try to change the return type to Map<DatanodeDetails, SCMNodeStat>
- Map<UUID, SCMNodeStat> getNodeStats();
+ Map<DatanodeDetails, SCMNodeStat> getNodeStats();
/**
* Return the node stat of the specified datanode.
@@ -159,17 +158,11 @@ public interface NodeManager extends StorageContainerNodeProtocol,
/**
* Process node report.
*
- * @param dnUuid
+ * @param datanodeDetails
* @param nodeReport
*/
- void processNodeReport(DatanodeDetails dnUuid, NodeReportProto nodeReport);
-
- /**
- * Process a dead node event in this Node Manager.
- *
- * @param dnUuid datanode uuid.
- */
- void processDeadNode(UUID dnUuid);
+ void processNodeReport(DatanodeDetails datanodeDetails,
+ NodeReportProto nodeReport);
/**
* Get list of SCMCommands in the Command Queue for a particular Datanode.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index 5ff9dfa..c54944b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.*;
import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
@@ -276,20 +275,6 @@ public class NodeStateManager implements Runnable, Closeable {
}
/**
- * Get information about the node.
- *
- * @param datanodeUUID datanode UUID
- *
- * @return DatanodeInfo
- *
- * @throws NodeNotFoundException if the node is not present
- */
- public DatanodeInfo getNode(UUID datanodeUUID)
- throws NodeNotFoundException {
- return nodeStateMap.getNodeInfo(datanodeUUID);
- }
-
- /**
* Updates the last heartbeat time of the node.
*
* @throws NodeNotFoundException if the node is not present
@@ -319,7 +304,7 @@ public class NodeStateManager implements Runnable, Closeable {
*
* @return list of healthy nodes
*/
- public List<DatanodeDetails> getHealthyNodes() {
+ public List<DatanodeInfo> getHealthyNodes() {
return getNodes(NodeState.HEALTHY);
}
@@ -328,7 +313,7 @@ public class NodeStateManager implements Runnable, Closeable {
*
* @return list of stale nodes
*/
- public List<DatanodeDetails> getStaleNodes() {
+ public List<DatanodeInfo> getStaleNodes() {
return getNodes(NodeState.STALE);
}
@@ -337,7 +322,7 @@ public class NodeStateManager implements Runnable, Closeable {
*
* @return list of dead nodes
*/
- public List<DatanodeDetails> getDeadNodes() {
+ public List<DatanodeInfo> getDeadNodes() {
return getNodes(NodeState.DEAD);
}
@@ -348,12 +333,12 @@ public class NodeStateManager implements Runnable, Closeable {
*
* @return list of nodes
*/
- public List<DatanodeDetails> getNodes(NodeState state) {
- List<DatanodeDetails> nodes = new ArrayList<>();
+ public List<DatanodeInfo> getNodes(NodeState state) {
+ List<DatanodeInfo> nodes = new ArrayList<>();
nodeStateMap.getNodes(state).forEach(
uuid -> {
try {
- nodes.add(nodeStateMap.getNodeDetails(uuid));
+ nodes.add(nodeStateMap.getNodeInfo(uuid));
} catch (NodeNotFoundException e) {
// This should not happen unless someone else other than
// NodeStateManager is directly modifying NodeStateMap and removed
@@ -369,12 +354,12 @@ public class NodeStateManager implements Runnable, Closeable {
*
* @return all the managed nodes
*/
- public List<DatanodeDetails> getAllNodes() {
- List<DatanodeDetails> nodes = new ArrayList<>();
+ public List<DatanodeInfo> getAllNodes() {
+ List<DatanodeInfo> nodes = new ArrayList<>();
nodeStateMap.getAllNodes().forEach(
uuid -> {
try {
- nodes.add(nodeStateMap.getNodeDetails(uuid));
+ nodes.add(nodeStateMap.getNodeInfo(uuid));
} catch (NodeNotFoundException e) {
// This should not happen unless someone else other than
// NodeStateManager is directly modifying NodeStateMap and removed
@@ -442,38 +427,6 @@ public class NodeStateManager implements Runnable, Closeable {
}
/**
- * Returns the current stats of the node.
- *
- * @param uuid node id
- *
- * @return SCMNodeStat
- *
- * @throws NodeNotFoundException if the node is not present
- */
- public SCMNodeStat getNodeStat(UUID uuid) throws NodeNotFoundException {
- return nodeStateMap.getNodeStat(uuid);
- }
-
- /**
- * Returns a unmodifiable copy of nodeStats.
- * @return map with node stats.
- */
- public Map<UUID, SCMNodeStat> getNodeStatsMap() {
- return nodeStateMap.getNodeStats();
- }
-
- /**
- * Set the stat for the node.
- *
- * @param uuid node id.
- *
- * @param newstat new stat that will set to the specify node.
- */
- public void setNodeStat(UUID uuid, SCMNodeStat newstat) {
- nodeStateMap.setNodeStat(uuid, newstat);
- }
-
- /**
* Removes a pipeline from the node2PipelineMap.
* @param pipeline - Pipeline to be removed
*/
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 61ddb5b..3c5eaf8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -19,7 +19,8 @@ package org.apache.hadoop.hdds.scm.node;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -65,6 +66,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.stream.Collectors;
/**
* Maintains information about the Datanodes on SCM side.
@@ -80,18 +82,13 @@ import java.util.UUID;
* get functions in this file as a snap-shot of information that is inconsistent
* as soon as you read it.
*/
-public class SCMNodeManager
- implements NodeManager, StorageContainerNodeProtocol {
+public class SCMNodeManager implements NodeManager {
@VisibleForTesting
static final Logger LOG =
LoggerFactory.getLogger(SCMNodeManager.class);
private final NodeStateManager nodeStateManager;
- // Should we maintain aggregated stats? If this is not frequently used, we
- // can always calculate it from nodeStats whenever required.
- // Aggregated node stats
- private SCMNodeStat scmStat;
private final String clusterID;
private final VersionInfo version;
private final CommandQueue commandQueue;
@@ -108,7 +105,6 @@ public class SCMNodeManager
StorageContainerManager scmManager, EventPublisher eventPublisher)
throws IOException {
this.nodeStateManager = new NodeStateManager(conf, eventPublisher);
- this.scmStat = new SCMNodeStat();
this.clusterID = clusterID;
this.version = VersionInfo.getLatestVersion();
this.commandQueue = new CommandQueue();
@@ -131,7 +127,7 @@ public class SCMNodeManager
/**
- * Gets all datanodes that are in a certain state. This function works by
+ * Returns all datanode that are in the given state. This function works by
* taking a snapshot of the current collection and then returning the list
* from that collection. This means that real map might have changed by the
* time we return this list.
@@ -140,7 +136,8 @@ public class SCMNodeManager
*/
@Override
public List<DatanodeDetails> getNodes(NodeState nodestate) {
- return nodeStateManager.getNodes(nodestate);
+ return nodeStateManager.getNodes(nodestate).stream()
+ .map(node -> (DatanodeDetails)node).collect(Collectors.toList());
}
/**
@@ -150,13 +147,14 @@ public class SCMNodeManager
*/
@Override
public List<DatanodeDetails> getAllNodes() {
- return nodeStateManager.getAllNodes();
+ return nodeStateManager.getAllNodes().stream()
+ .map(node -> (DatanodeDetails)node).collect(Collectors.toList());
}
/**
* Returns the Number of Datanodes by State they are in.
*
- * @return int -- count
+ * @return count
*/
@Override
public int getNodeCount(NodeState nodestate) {
@@ -166,7 +164,7 @@ public class SCMNodeManager
/**
* Returns the node state of a specific node.
*
- * @param datanodeDetails - Datanode Details
+ * @param datanodeDetails Datanode Details
* @return Healthy/Stale/Dead/Unknown.
*/
@Override
@@ -179,47 +177,6 @@ public class SCMNodeManager
}
}
-
- private void updateNodeStat(UUID dnId, NodeReportProto nodeReport) {
- SCMNodeStat stat;
- try {
- stat = nodeStateManager.getNodeStat(dnId);
-
- // Updating the storage report for the datanode.
- // I dont think we will get NotFound exception, as we are taking
- // nodeInfo from nodeStateMap, as I see it is not being removed from
- // the map, just we change the states. And during first time
- // registration we call this, after adding to nodeStateMap. And also
- // from eventhandler it is called only if it has node Report.
- DatanodeInfo datanodeInfo = nodeStateManager.getNode(dnId);
- if (nodeReport != null) {
- datanodeInfo.updateStorageReports(nodeReport.getStorageReportList());
- }
-
- } catch (NodeNotFoundException e) {
- LOG.debug("SCM updateNodeStat based on heartbeat from previous " +
- "dead datanode {}", dnId);
- stat = new SCMNodeStat();
- }
-
- if (nodeReport != null && nodeReport.getStorageReportCount() > 0) {
- long totalCapacity = 0;
- long totalRemaining = 0;
- long totalScmUsed = 0;
- List<StorageReportProto> storageReports = nodeReport
- .getStorageReportList();
- for (StorageReportProto report : storageReports) {
- totalCapacity += report.getCapacity();
- totalRemaining += report.getRemaining();
- totalScmUsed+= report.getScmUsed();
- }
- scmStat.subtract(stat);
- stat.set(totalCapacity, totalScmUsed, totalRemaining);
- scmStat.add(stat);
- }
- nodeStateManager.setNodeStat(dnId, stat);
- }
-
/**
* Closes this stream and releases any system resources associated with it. If
* the stream is already closed then invoking this method has no effect.
@@ -275,7 +232,7 @@ public class SCMNodeManager
try {
nodeStateManager.addNode(datanodeDetails);
// Updating Node Report, as registration is successful
- updateNodeStat(datanodeDetails.getUuid(), nodeReport);
+ processNodeReport(datanodeDetails, nodeReport);
LOG.info("Registered Data node : {}", datanodeDetails);
} catch (NodeAlreadyExistsException e) {
LOG.trace("Datanode is already registered. Datanode: {}",
@@ -321,13 +278,21 @@ public class SCMNodeManager
/**
* Process node report.
*
- * @param dnUuid
+ * @param datanodeDetails
* @param nodeReport
*/
@Override
- public void processNodeReport(DatanodeDetails dnUuid,
+ public void processNodeReport(DatanodeDetails datanodeDetails,
NodeReportProto nodeReport) {
- this.updateNodeStat(dnUuid.getUuid(), nodeReport);
+ try {
+ DatanodeInfo datanodeInfo = nodeStateManager.getNode(datanodeDetails);
+ if (nodeReport != null) {
+ datanodeInfo.updateStorageReports(nodeReport.getStorageReportList());
+ }
+ } catch (NodeNotFoundException e) {
+ LOG.warn("Got node report from unregistered datanode {}",
+ datanodeDetails);
+ }
}
/**
@@ -336,7 +301,16 @@ public class SCMNodeManager
*/
@Override
public SCMNodeStat getStats() {
- return new SCMNodeStat(this.scmStat);
+ long capacity = 0L;
+ long used = 0L;
+ long remaining = 0L;
+
+ for (SCMNodeStat stat : getNodeStats().values()) {
+ capacity += stat.getCapacity().get();
+ used += stat.getScmUsed().get();
+ remaining += stat.getRemaining().get();
+ }
+ return new SCMNodeStat(capacity, used, remaining);
}
/**
@@ -344,8 +318,24 @@ public class SCMNodeManager
* @return a map of individual node stats (live/stale but not dead).
*/
@Override
- public Map<UUID, SCMNodeStat> getNodeStats() {
- return nodeStateManager.getNodeStatsMap();
+ public Map<DatanodeDetails, SCMNodeStat> getNodeStats() {
+
+ final Map<DatanodeDetails, SCMNodeStat> nodeStats = new HashMap<>();
+
+ final List<DatanodeInfo> healthyNodes = nodeStateManager
+ .getNodes(NodeState.HEALTHY);
+ final List<DatanodeInfo> staleNodes = nodeStateManager
+ .getNodes(NodeState.STALE);
+ final List<DatanodeInfo> datanodes = new ArrayList<>(healthyNodes);
+ datanodes.addAll(staleNodes);
+
+ for (DatanodeInfo dnInfo : datanodes) {
+ SCMNodeStat nodeStat = getNodeStatInternal(dnInfo);
+ if (nodeStat != null) {
+ nodeStats.put(dnInfo, nodeStat);
+ }
+ }
+ return nodeStats;
}
/**
@@ -356,11 +346,28 @@ public class SCMNodeManager
*/
@Override
public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
+ final SCMNodeStat nodeStat = getNodeStatInternal(datanodeDetails);
+ return nodeStat != null ? new SCMNodeMetric(nodeStat) : null;
+ }
+
+ private SCMNodeStat getNodeStatInternal(DatanodeDetails datanodeDetails) {
try {
- return new SCMNodeMetric(
- nodeStateManager.getNodeStat(datanodeDetails.getUuid()));
+ long capacity = 0L;
+ long used = 0L;
+ long remaining = 0L;
+
+ final DatanodeInfo datanodeInfo = nodeStateManager
+ .getNode(datanodeDetails);
+ final List<StorageReportProto> storageReportProtos = datanodeInfo
+ .getStorageReports();
+ for (StorageReportProto reportProto : storageReportProtos) {
+ capacity += reportProto.getCapacity();
+ used += reportProto.getScmUsed();
+ remaining += reportProto.getRemaining();
+ }
+ return new SCMNodeStat(capacity, used, remaining);
} catch (NodeNotFoundException e) {
- LOG.info("SCM getNodeStat from a decommissioned or removed datanode {}",
+ LOG.warn("Cannot generate NodeStat, datanode {} not found.",
datanodeDetails.getUuid());
return null;
}
@@ -375,6 +382,8 @@ public class SCMNodeManager
return nodeCountMap;
}
+ // We should introduce DISK, SSD, etc., notion in
+ // SCMNodeStat and try to use it.
@Override
public Map<String, Long> getNodeInfo() {
long diskCapacity = 0L;
@@ -385,14 +394,15 @@ public class SCMNodeManager
long ssdUsed = 0L;
long ssdRemaining = 0L;
- List<DatanodeDetails> healthyNodes = getNodes(NodeState.HEALTHY);
- List<DatanodeDetails> staleNodes = getNodes(NodeState.STALE);
+ List<DatanodeInfo> healthyNodes = nodeStateManager
+ .getNodes(NodeState.HEALTHY);
+ List<DatanodeInfo> staleNodes = nodeStateManager
+ .getNodes(NodeState.STALE);
- List<DatanodeDetails> datanodes = new ArrayList<>(healthyNodes);
+ List<DatanodeInfo> datanodes = new ArrayList<>(healthyNodes);
datanodes.addAll(staleNodes);
- for (DatanodeDetails datanodeDetails : datanodes) {
- DatanodeInfo dnInfo = (DatanodeInfo) datanodeDetails;
+ for (DatanodeInfo dnInfo : datanodes) {
List<StorageReportProto> storageReportProtos = dnInfo.getStorageReports();
for (StorageReportProto reportProto : storageReportProtos) {
if (reportProto.getStorageType() ==
@@ -498,27 +508,6 @@ public class SCMNodeManager
commandForDatanode.getCommand());
}
- /**
- * Update the node stats and cluster storage stats in this SCM Node Manager.
- *
- * @param dnUuid datanode uuid.
- */
- @Override
- // TODO: This should be removed.
- public void processDeadNode(UUID dnUuid) {
- try {
- SCMNodeStat stat = nodeStateManager.getNodeStat(dnUuid);
- if (stat != null) {
- LOG.trace("Update stat values as Datanode {} is dead.", dnUuid);
- scmStat.subtract(stat);
- stat.set(0, 0, 0);
- }
- } catch (NodeNotFoundException e) {
- LOG.warn("Can't update stats based on message of dead Datanode {}, it"
- + " doesn't exist or decommissioned already.", dnUuid);
- }
- }
-
@Override
public List<SCMCommand> getCommandQueue(UUID dnID) {
return commandQueue.getCommand(dnID);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
index a68e2b5..fd87cca 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/NodeStateMap.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdds.scm.node.states;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import java.util.*;
@@ -46,10 +45,6 @@ public class NodeStateMap {
*/
private final ConcurrentHashMap<NodeState, Set<UUID>> stateMap;
/**
- * Represents the current stats of node.
- */
- private final ConcurrentHashMap<UUID, SCMNodeStat> nodeStats;
- /**
* Node to set of containers on the node.
*/
private final ConcurrentHashMap<UUID, Set<ContainerID>> nodeToContainer;
@@ -63,7 +58,6 @@ public class NodeStateMap {
lock = new ReentrantReadWriteLock();
nodeMap = new ConcurrentHashMap<>();
stateMap = new ConcurrentHashMap<>();
- nodeStats = new ConcurrentHashMap<>();
nodeToContainer = new ConcurrentHashMap<>();
initStateMap();
}
@@ -94,7 +88,6 @@ public class NodeStateMap {
throw new NodeAlreadyExistsException("Node UUID: " + id);
}
nodeMap.put(id, new DatanodeInfo(datanodeDetails));
- nodeStats.put(id, new SCMNodeStat());
nodeToContainer.put(id, Collections.emptySet());
stateMap.get(nodeState).add(id);
} finally {
@@ -127,20 +120,6 @@ public class NodeStateMap {
}
/**
- * Returns DatanodeDetails for the given node id.
- *
- * @param uuid Node Id
- *
- * @return DatanodeDetails of the node
- *
- * @throws NodeNotFoundException if the node is not present
- */
- public DatanodeDetails getNodeDetails(UUID uuid)
- throws NodeNotFoundException {
- return getNodeInfo(uuid);
- }
-
- /**
* Returns DatanodeInfo for the given node id.
*
* @param uuid Node Id
@@ -245,43 +224,6 @@ public class NodeStateMap {
}
}
- /**
- * Returns the current stats of the node.
- *
- * @param uuid node id
- *
- * @return SCMNodeStat of the specify node.
- *
- * @throws NodeNotFoundException if the node is not found
- */
- public SCMNodeStat getNodeStat(UUID uuid) throws NodeNotFoundException {
- SCMNodeStat stat = nodeStats.get(uuid);
- if (stat == null) {
- throw new NodeNotFoundException("Node UUID: " + uuid);
- }
- return stat;
- }
-
- /**
- * Returns a unmodifiable copy of nodeStats.
- *
- * @return map with node stats.
- */
- public Map<UUID, SCMNodeStat> getNodeStats() {
- return Collections.unmodifiableMap(nodeStats);
- }
-
- /**
- * Set the current stats of the node.
- *
- * @param uuid node id
- *
- * @param newstat stat that will set to the specify node.
- */
- public void setNodeStat(UUID uuid, SCMNodeStat newstat) {
- nodeStats.put(uuid, newstat);
- }
-
public void setContainers(UUID uuid, Set<ContainerID> containers)
throws NodeNotFoundException{
if (!nodeToContainer.containsKey(uuid)) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 33fa1fa..ffdea0e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -76,7 +76,7 @@ public class MockNodeManager implements NodeManager {
private final List<DatanodeDetails> healthyNodes;
private final List<DatanodeDetails> staleNodes;
private final List<DatanodeDetails> deadNodes;
- private final Map<UUID, SCMNodeStat> nodeMetricMap;
+ private final Map<DatanodeDetails, SCMNodeStat> nodeMetricMap;
private final SCMNodeStat aggregateStat;
private boolean chillmode;
private final Map<UUID, List<SCMCommand>> commandMap;
@@ -114,7 +114,7 @@ public class MockNodeManager implements NodeManager {
newStat.set(
(NODES[x % NODES.length].capacity),
(NODES[x % NODES.length].used), remaining);
- this.nodeMetricMap.put(datanodeDetails.getUuid(), newStat);
+ this.nodeMetricMap.put(datanodeDetails, newStat);
aggregateStat.add(newStat);
if (NODES[x % NODES.length].getCurrentState() == NodeData.HEALTHY) {
@@ -201,7 +201,7 @@ public class MockNodeManager implements NodeManager {
* @return a list of individual node stats (live/stale but not dead).
*/
@Override
- public Map<UUID, SCMNodeStat> getNodeStats() {
+ public Map<DatanodeDetails, SCMNodeStat> getNodeStats() {
return nodeMetricMap;
}
@@ -213,7 +213,7 @@ public class MockNodeManager implements NodeManager {
*/
@Override
public SCMNodeMetric getNodeStat(DatanodeDetails datanodeDetails) {
- SCMNodeStat stat = nodeMetricMap.get(datanodeDetails.getUuid());
+ SCMNodeStat stat = nodeMetricMap.get(datanodeDetails);
if (stat == null) {
return null;
}
@@ -413,12 +413,12 @@ public class MockNodeManager implements NodeManager {
* @param size number of bytes.
*/
public void addContainer(DatanodeDetails datanodeDetails, long size) {
- SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid());
+ SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails);
if (stat != null) {
aggregateStat.subtract(stat);
stat.getCapacity().add(size);
aggregateStat.add(stat);
- nodeMetricMap.put(datanodeDetails.getUuid(), stat);
+ nodeMetricMap.put(datanodeDetails, stat);
}
}
@@ -429,12 +429,12 @@ public class MockNodeManager implements NodeManager {
* @param size number of bytes.
*/
public void delContainer(DatanodeDetails datanodeDetails, long size) {
- SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails.getUuid());
+ SCMNodeStat stat = this.nodeMetricMap.get(datanodeDetails);
if (stat != null) {
aggregateStat.subtract(stat);
stat.getCapacity().subtract(size);
aggregateStat.add(stat);
- nodeMetricMap.put(datanodeDetails.getUuid(), stat);
+ nodeMetricMap.put(datanodeDetails, stat);
}
}
@@ -445,21 +445,6 @@ public class MockNodeManager implements NodeManager {
commandForDatanode.getCommand());
}
- /**
- * Remove the node stats and update the storage stats
- * in this Node Manager.
- *
- * @param dnUuid UUID of the datanode.
- */
- @Override
- public void processDeadNode(UUID dnUuid) {
- SCMNodeStat stat = this.nodeMetricMap.get(dnUuid);
- if (stat != null) {
- aggregateStat.subtract(stat);
- stat.set(0, 0, 0);
- }
- }
-
@Override
public List<SCMCommand> getCommandQueue(UUID dnID) {
return null;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 8dbf389..37b28eb 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
@@ -81,6 +82,10 @@ public class TestDeadNodeHandler {
storageDir = GenericTestUtils.getTempPath(
TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
+ conf.set(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, "100ms");
+ conf.set(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, "50ms");
+ conf.set(ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL, "1s");
+ conf.set(ScmConfigKeys.OZONE_SCM_DEADNODE_INTERVAL, "2s");
eventQueue = new EventQueue();
scm = HddsTestUtils.getScm(conf);
nodeManager = (SCMNodeManager) scm.getScmNodeManager();
@@ -237,20 +242,21 @@ public class TestDeadNodeHandler {
Assert.assertTrue(nodeStat.get().getRemaining().get() == 90);
Assert.assertTrue(nodeStat.get().getScmUsed().get() == 10);
- //WHEN datanode1 is dead.
- eventQueue.fireEvent(SCMEvents.DEAD_NODE, datanode1);
- Thread.sleep(100);
+ //TODO: Support logic to mark a node as dead in NodeManager.
+ nodeManager.processHeartbeat(datanode2);
+ Thread.sleep(1000);
+ nodeManager.processHeartbeat(datanode2);
+ Thread.sleep(1000);
+ nodeManager.processHeartbeat(datanode2);
+ Thread.sleep(1000);
+ nodeManager.processHeartbeat(datanode2);
//THEN statistics in SCM should changed.
stat = nodeManager.getStats();
- Assert.assertTrue(stat.getCapacity().get() == 200);
- Assert.assertTrue(stat.getRemaining().get() == 180);
- Assert.assertTrue(stat.getScmUsed().get() == 20);
-
- nodeStat = nodeManager.getNodeStat(datanode1);
- Assert.assertTrue(nodeStat.get().getCapacity().get() == 0);
- Assert.assertTrue(nodeStat.get().getRemaining().get() == 0);
- Assert.assertTrue(nodeStat.get().getScmUsed().get() == 0);
+ Assert.assertEquals(200L, stat.getCapacity().get().longValue());
+ Assert.assertEquals(180L,
+ stat.getRemaining().get().longValue());
+ Assert.assertEquals(20L, stat.getScmUsed().get().longValue());
}
@Test
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java
index fa163eb..1cb9bcd 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java
@@ -63,8 +63,7 @@ public class TestNodeReportHandler implements EventPublisher {
SCMNodeMetric nodeMetric = nodeManager.getNodeStat(dn);
Assert.assertNull(nodeMetric);
- nodeReportHandler.onMessage(
- getNodeReport(dn, storageOne), this);
+ nodeManager.register(dn, getNodeReport(dn, storageOne).getReport(), null);
nodeMetric = nodeManager.getNodeStat(dn);
Assert.assertTrue(nodeMetric.get().getCapacity().get() == 100);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 3f31708..35cc1aa9 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -126,7 +126,7 @@ public class ReplicationNodeManagerMock implements NodeManager {
* @return a map of individual node stats (live/stale but not dead).
*/
@Override
- public Map<UUID, SCMNodeStat> getNodeStats() {
+ public Map<DatanodeDetails, SCMNodeStat> getNodeStats() {
return null;
}
@@ -305,15 +305,6 @@ public class ReplicationNodeManagerMock implements NodeManager {
// do nothing.
}
- /**
- * Empty implementation for processDeadNode.
- * @param dnUuid
- */
- @Override
- public void processDeadNode(UUID dnUuid) {
- // do nothing.
- }
-
@Override
public List<SCMCommand> getCommandQueue(UUID dnID) {
return null;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org