You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2019/11/25 07:16:48 UTC
[hadoop-ozone] 01/04: HDDS-1577. Add default pipeline placement
policy implementation. (#1366)
This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 748cb018a0b20b0e0993565ce15c8ad908d6d1e2
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Thu Sep 5 11:51:40 2019 +0800
HDDS-1577. Add default pipeline placement policy implementation. (#1366)
(cherry picked from commit b640a5f6d53830aee4b9c2a7d17bf57c987962cd)
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 5 +
.../common/src/main/resources/ozone-default.xml | 7 +
.../apache/hadoop/hdds/scm/node/NodeManager.java | 14 +
.../hadoop/hdds/scm/node/NodeStateManager.java | 9 +
.../hadoop/hdds/scm/node/SCMNodeManager.java | 19 ++
.../hdds/scm/node/states/Node2ObjectsMap.java | 4 +-
.../hdds/scm/node/states/Node2PipelineMap.java | 12 +-
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 338 +++++++++++++++++++++
.../hadoop/hdds/scm/container/MockNodeManager.java | 36 ++-
.../scm/pipeline/TestPipelinePlacementPolicy.java | 197 ++++++++++++
.../testutils/ReplicationNodeManagerMock.java | 16 +
11 files changed, 654 insertions(+), 3 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 3c35e56..e6fed5b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -315,6 +315,11 @@ public final class ScmConfigKeys {
public static final String OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT =
"ozone.scm.pipeline.owner.container.count";
public static final int OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT = 3;
+ // Pipeline placement policy:
+ // the max number of pipelines can a single datanode be engaged in.
+ public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT =
+ "ozone.scm.datanode.max.pipeline.engagement";
+ public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 5;
public static final String
OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 8110242..94e8557 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -839,6 +839,13 @@
</description>
</property>
<property>
+ <name>ozone.scm.datanode.max.pipeline.engagement</name>
+ <value>5</value>
+ <tag>OZONE, SCM, PIPELINE</tag>
+ <description>Max number of pipelines per datanode can be engaged in.
+ </description>
+ </property>
+ <property>
<name>ozone.scm.container.size</name>
<value>5GB</value>
<tag>OZONE, PERFORMANCE, MANAGEMENT</tag>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index fd8bb87..37562fe 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -118,6 +119,13 @@ public interface NodeManager extends StorageContainerNodeProtocol,
Set<PipelineID> getPipelines(DatanodeDetails datanodeDetails);
/**
+ * Get the count of pipelines a datanodes is associated with.
+ * @param datanodeDetails DatanodeDetails
+ * @return The number of pipelines
+ */
+ int getPipelinesCount(DatanodeDetails datanodeDetails);
+
+ /**
* Add pipeline information in the NodeManager.
* @param pipeline - Pipeline to be added
*/
@@ -199,4 +207,10 @@ public interface NodeManager extends StorageContainerNodeProtocol,
* @return the given datanode, or empty list if none found
*/
List<DatanodeDetails> getNodesByAddress(String address);
+
+ /**
+ * Get cluster map as in network topology for this node manager.
+ * @return cluster map
+ */
+ NetworkTopology getClusterNetworkTopologyMap();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index 954cb0e..9d2a9f2 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -284,6 +284,15 @@ public class NodeStateManager implements Runnable, Closeable {
}
/**
+ * Get the count of pipelines associated to single datanode.
+ * @param datanodeDetails single datanode
+ * @return number of pipelines associated with it
+ */
+ public int getPipelinesCount(DatanodeDetails datanodeDetails) {
+ return node2PipelineMap.getPipelinesCount(datanodeDetails.getUuid());
+ }
+
+ /**
* Get information about the node.
*
* @param datanodeDetails DatanodeDetails
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index f077e72..66cca46 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -510,6 +510,16 @@ public class SCMNodeManager implements NodeManager {
}
/**
+ * Get the count of pipelines a datanodes is associated with.
+ * @param datanodeDetails DatanodeDetails
+ * @return The number of pipelines
+ */
+ @Override
+ public int getPipelinesCount(DatanodeDetails datanodeDetails) {
+ return nodeStateManager.getPipelinesCount(datanodeDetails);
+ }
+
+ /**
* Add pipeline information in the NodeManager.
*
* @param pipeline - Pipeline to be added
@@ -643,6 +653,15 @@ public class SCMNodeManager implements NodeManager {
return results;
}
+ /**
+ * Get cluster map as in network topology for this node manager.
+ * @return cluster map
+ */
+ @Override
+ public NetworkTopology getClusterNetworkTopologyMap() {
+ return clusterMap;
+ }
+
private String nodeResolve(String hostname) {
List<String> hosts = new ArrayList<>(1);
hosts.add(hostname);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
index 37525b0..57a377d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2ObjectsMap.java
@@ -67,6 +67,7 @@ public class Node2ObjectsMap<T> {
* @param datanodeID -- Datanode UUID
* @param containerIDs - List of ContainerIDs.
*/
+ @VisibleForTesting
public void insertNewDatanode(UUID datanodeID, Set<T> containerIDs)
throws SCMException {
Preconditions.checkNotNull(containerIDs);
@@ -83,7 +84,8 @@ public class Node2ObjectsMap<T> {
*
* @param datanodeID - Datanode ID.
*/
- void removeDatanode(UUID datanodeID) {
+ @VisibleForTesting
+ public void removeDatanode(UUID datanodeID) {
Preconditions.checkNotNull(datanodeID);
dn2ObjectMap.computeIfPresent(datanodeID, (k, v) -> null);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
index f8633f9..714188d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
@@ -42,7 +42,7 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
}
/**
- * Returns null if there no pipelines associated with this datanode ID.
+ * Returns null if there are no pipelines associated with this datanode ID.
*
* @param datanode - UUID
* @return Set of pipelines or Null.
@@ -52,6 +52,16 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
}
/**
+ * Return 0 if there are no pipelines associated with this datanode ID.
+ * @param datanode - UUID
+ * @return Number of pipelines or 0.
+ */
+ public int getPipelinesCount(UUID datanode) {
+ Set<PipelineID> pipelines = getObjects(datanode);
+ return pipelines == null ? 0 : pipelines.size();
+ }
+
+ /**
* Adds a pipeline entry to a given dataNode in the map.
*
* @param pipeline Pipeline to be added
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
new file mode 100644
index 0000000..cb9954d
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -0,0 +1,338 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMCommonPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.Node;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Pipeline placement policy that choose datanodes based on load balancing
+ * and network topology to supply pipeline creation.
+ * <p>
+ * 1. get a list of healthy nodes
+ * 2. filter out nodes that are not too heavily engaged in other pipelines
+ * 3. Choose an anchor node among the viable nodes.
+ * 4. Choose other nodes around the anchor node based on network topology
+ */
+public final class PipelinePlacementPolicy extends SCMCommonPolicy {
+ @VisibleForTesting
+ static final Logger LOG =
+ LoggerFactory.getLogger(PipelinePlacementPolicy.class);
+ private final NodeManager nodeManager;
+ private final Configuration conf;
+ private final int heavyNodeCriteria;
+
+ /**
+ * Constructs a pipeline placement with considering network topology,
+ * load balancing and rack awareness.
+ *
+ * @param nodeManager Node Manager
+ * @param conf Configuration
+ */
+ public PipelinePlacementPolicy(
+ final NodeManager nodeManager, final Configuration conf) {
+ super(nodeManager, conf);
+ this.nodeManager = nodeManager;
+ this.conf = conf;
+ heavyNodeCriteria = conf.getInt(
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
+ }
+
+ /**
+ * Returns true if this node meets the criteria.
+ *
+ * @param datanodeDetails DatanodeDetails
+ * @return true if we have enough space.
+ */
+ @VisibleForTesting
+ boolean meetCriteria(DatanodeDetails datanodeDetails, long heavyNodeLimit) {
+ return (nodeManager.getPipelinesCount(datanodeDetails) <= heavyNodeLimit);
+ }
+
+ /**
+ * Filter out viable nodes based on
+ * 1. nodes that are healthy
+ * 2. nodes that are not too heavily engaged in other pipelines
+ *
+ * @param excludedNodes - excluded nodes
+ * @param nodesRequired - number of datanodes required.
+ * @return a list of viable nodes
+ * @throws SCMException when viable nodes are not enough in numbers
+ */
+ List<DatanodeDetails> filterViableNodes(
+ List<DatanodeDetails> excludedNodes, int nodesRequired)
+ throws SCMException {
+ // get nodes in HEALTHY state
+ List<DatanodeDetails> healthyNodes =
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+ if (excludedNodes != null) {
+ healthyNodes.removeAll(excludedNodes);
+ }
+ String msg;
+ if (healthyNodes.size() == 0) {
+ msg = "No healthy node found to allocate pipeline.";
+ LOG.error(msg);
+ throw new SCMException(msg, SCMException.ResultCodes
+ .FAILED_TO_FIND_HEALTHY_NODES);
+ }
+
+ if (healthyNodes.size() < nodesRequired) {
+ msg = String.format("Not enough healthy nodes to allocate pipeline. %d "
+ + " datanodes required. Found %d",
+ nodesRequired, healthyNodes.size());
+ LOG.error(msg);
+ throw new SCMException(msg,
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+
+ // filter nodes that meet the size and pipeline engagement criteria.
+ // Pipeline placement doesn't take node space left into account.
+ List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d ->
+ meetCriteria(d, heavyNodeCriteria)).collect(Collectors.toList());
+
+ if (healthyList.size() < nodesRequired) {
+ msg = String.format("Unable to find enough nodes that meet " +
+ "the criteria that cannot engage in more than %d pipelines." +
+ " Nodes required: %d Found: %d",
+ heavyNodeCriteria, nodesRequired, healthyList.size());
+ LOG.error(msg);
+ throw new SCMException(msg,
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+ return healthyList;
+ }
+
+ /**
+ * Pipeline placement choose datanodes to join the pipeline.
+ *
+ * @param excludedNodes - excluded nodes
+ * @param favoredNodes - list of nodes preferred.
+ * @param nodesRequired - number of datanodes required.
+ * @param sizeRequired - size required for the container or block.
+ * @return a list of chosen datanodeDetails
+ * @throws SCMException when chosen nodes are not enough in numbers
+ */
+ @Override
+ public List<DatanodeDetails> chooseDatanodes(
+ List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes,
+ int nodesRequired, final long sizeRequired) throws SCMException {
+ // get a list of viable nodes based on criteria
+ List<DatanodeDetails> healthyNodes =
+ filterViableNodes(excludedNodes, nodesRequired);
+
+ List<DatanodeDetails> results = new ArrayList<>();
+
+ // Randomly picks nodes when all nodes are equal.
+ // This happens when network topology is absent or
+ // all nodes are on the same rack.
+ if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
+ LOG.info("All nodes are considered equal. Now randomly pick nodes. " +
+ "Required nodes: {}", nodesRequired);
+ results = super.getResultSet(nodesRequired, healthyNodes);
+ if (results.size() < nodesRequired) {
+ LOG.error("Unable to find the required number of healthy nodes that " +
+ "meet the criteria. Required nodes: {}, Found nodes: {}",
+ nodesRequired, results.size());
+ throw new SCMException("Unable to find required number of nodes.",
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+ return results;
+ }
+
+ // Since nodes are widely distributed, the results should be selected
+ // base on distance in topology, rack awareness and load balancing.
+ List<DatanodeDetails> exclude = new ArrayList<>();
+ exclude.addAll(excludedNodes);
+ // First choose an anchor nodes randomly
+ DatanodeDetails anchor = chooseNode(healthyNodes);
+ if (anchor == null) {
+ LOG.error("Unable to find the first healthy nodes that " +
+ "meet the criteria. Required nodes: {}, Found nodes: {}",
+ nodesRequired, results.size());
+ throw new SCMException("Unable to find required number of nodes.",
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+
+ results.add(anchor);
+ exclude.add(anchor);
+ nodesRequired--;
+
+ // Choose the second node on different racks from anchor.
+ DatanodeDetails nodeOnDifferentRack = chooseNodeBasedOnRackAwareness(
+ healthyNodes, excludedNodes,
+ nodeManager.getClusterNetworkTopologyMap(), anchor);
+ if (nodeOnDifferentRack == null) {
+ LOG.error("Unable to find nodes on different racks that " +
+ "meet the criteria. Required nodes: {}, Found nodes: {}",
+ nodesRequired, results.size());
+ throw new SCMException("Unable to find required number of nodes.",
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+
+ results.add(nodeOnDifferentRack);
+ exclude.add(nodeOnDifferentRack);
+ nodesRequired--;
+
+ // Then choose nodes close to anchor based on network topology
+ for (int x = 0; x < nodesRequired; x++) {
+ // invoke the choose function defined in the derived classes.
+ DatanodeDetails pick = chooseNodeFromNetworkTopology(
+ nodeManager.getClusterNetworkTopologyMap(), anchor, exclude);
+ if (pick != null) {
+ results.add(pick);
+ // exclude the picked node for next time
+ exclude.add(pick);
+ }
+ }
+
+ if (results.size() < nodesRequired) {
+ LOG.error("Unable to find the required number of healthy nodes that " +
+ "meet the criteria. Required nodes: {}, Found nodes: {}",
+ nodesRequired, results.size());
+ throw new SCMException("Unable to find required number of nodes.",
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+ return results;
+ }
+
+ /**
+ * Find a node from the healthy list and return it after removing it from the
+ * list that we are operating on.
+ *
+ * @param healthyNodes - Set of healthy nodes we can choose from.
+ * @return chosen datanodDetails
+ */
+ @Override
+ public DatanodeDetails chooseNode(
+ List<DatanodeDetails> healthyNodes) {
+ int firstNodeNdx = getRand().nextInt(healthyNodes.size());
+ int secondNodeNdx = getRand().nextInt(healthyNodes.size());
+
+ DatanodeDetails datanodeDetails;
+ // There is a possibility that both numbers will be same.
+ // if that is so, we just return the node.
+ if (firstNodeNdx == secondNodeNdx) {
+ datanodeDetails = healthyNodes.get(firstNodeNdx);
+ } else {
+ DatanodeDetails firstNodeDetails = healthyNodes.get(firstNodeNdx);
+ DatanodeDetails secondNodeDetails = healthyNodes.get(secondNodeNdx);
+ SCMNodeMetric firstNodeMetric =
+ nodeManager.getNodeStat(firstNodeDetails);
+ SCMNodeMetric secondNodeMetric =
+ nodeManager.getNodeStat(secondNodeDetails);
+ datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get())
+ ? firstNodeDetails : secondNodeDetails;
+ }
+ // the pick is decided and it should be removed from candidates.
+ healthyNodes.remove(datanodeDetails);
+ return datanodeDetails;
+ }
+
+ /**
+ * Choose node on different racks as anchor is on based on rack awareness.
+ * If a node on different racks cannot be found, then return a random node.
+ * @param healthyNodes healthy nodes
+ * @param excludedNodes excluded nodes
+ * @param networkTopology network topology
+ * @param anchor anchor node
+ * @return a node on different rack
+ */
+ @VisibleForTesting
+ protected DatanodeDetails chooseNodeBasedOnRackAwareness(
+ List<DatanodeDetails> healthyNodes, List<DatanodeDetails> excludedNodes,
+ NetworkTopology networkTopology, DatanodeDetails anchor) {
+ Preconditions.checkArgument(networkTopology != null);
+ if (checkAllNodesAreEqual(networkTopology)) {
+ return null;
+ }
+
+ for (DatanodeDetails node : healthyNodes) {
+ if (excludedNodes.contains(node)
+ || networkTopology.isSameParent(anchor, node)) {
+ continue;
+ } else {
+ // the pick is decided and it should be removed from candidates.
+ healthyNodes.remove(node);
+ return node;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Check if all nodes are equal in topology.
+ * They are equal when network topology is absent or there are on
+ * the same rack.
+ * @param topology network topology
+ * @return true when all nodes are equal
+ */
+ private boolean checkAllNodesAreEqual(NetworkTopology topology) {
+ if (topology == null) {
+ return true;
+ }
+ return (topology.getNumOfNodes(topology.getMaxLevel() - 1) == 1);
+ }
+
+ /**
+ * Choose node based on network topology.
+ * @param networkTopology network topology
+ * @param anchor anchor datanode to start with
+ * @param excludedNodes excluded datanodes
+ * @return chosen datanode
+ */
+ @VisibleForTesting
+ protected DatanodeDetails chooseNodeFromNetworkTopology(
+ NetworkTopology networkTopology, DatanodeDetails anchor,
+ List<DatanodeDetails> excludedNodes) {
+ Preconditions.checkArgument(networkTopology != null);
+
+ Collection<Node> excluded = new ArrayList<>();
+ if (excludedNodes != null && excludedNodes.size() != 0) {
+ excluded.addAll(excludedNodes);
+ }
+ excluded.add(anchor);
+
+ Node pick = networkTopology.chooseRandom(
+ anchor.getNetworkLocation(), excluded);
+ DatanodeDetails pickedNode = (DatanodeDetails) pick;
+ // exclude the picked node for next time
+ if (excludedNodes != null) {
+ excludedNodes.add(pickedNode);
+ }
+ return pickedNode;
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 87cc177..613146d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -16,11 +16,13 @@
*/
package org.apache.hadoop.hdds.scm.container;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.net.NetConstants;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
+import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
@@ -86,7 +88,7 @@ public class MockNodeManager implements NodeManager {
private final SCMNodeStat aggregateStat;
private boolean safemode;
private final Map<UUID, List<SCMCommand>> commandMap;
- private final Node2PipelineMap node2PipelineMap;
+ private Node2PipelineMap node2PipelineMap;
private final Node2ContainerMap node2ContainerMap;
private NetworkTopology clusterMap;
private ConcurrentMap<String, Set<String>> dnsToUuidMap;
@@ -100,6 +102,7 @@ public class MockNodeManager implements NodeManager {
this.node2ContainerMap = new Node2ContainerMap();
this.dnsToUuidMap = new ConcurrentHashMap<>();
aggregateStat = new SCMNodeStat();
+ clusterMap = new NetworkTopologyImpl(new Configuration());
if (initializeFakeNodes) {
for (int x = 0; x < nodeCount; x++) {
DatanodeDetails dd = TestUtils.randomDatanodeDetails();
@@ -251,6 +254,16 @@ public class MockNodeManager implements NodeManager {
}
/**
+ * Get the count of pipelines a datanodes is associated with.
+ * @param datanodeDetails DatanodeDetails
+ * @return The number of pipelines
+ */
+ @Override
+ public int getPipelinesCount(DatanodeDetails datanodeDetails) {
+ return node2PipelineMap.getPipelinesCount(datanodeDetails.getUuid());
+ }
+
+ /**
* Add pipeline information in the NodeManager.
* @param pipeline - Pipeline to be added
*/
@@ -260,6 +273,22 @@ public class MockNodeManager implements NodeManager {
}
/**
+ * Get the entire Node2PipelineMap.
+ * @return Node2PipelineMap
+ */
+ public Node2PipelineMap getNode2PipelineMap() {
+ return node2PipelineMap;
+ }
+
+ /**
+ * Set the Node2PipelineMap.
+ * @param node2PipelineMap Node2PipelineMap
+ */
+ public void setNode2PipelineMap(Node2PipelineMap node2PipelineMap) {
+ this.node2PipelineMap = node2PipelineMap;
+ }
+
+ /**
* Remove a pipeline information from the NodeManager.
* @param pipeline - Pipeline to be removed
*/
@@ -517,6 +546,11 @@ public class MockNodeManager implements NodeManager {
return results;
}
+ @Override
+ public NetworkTopology getClusterNetworkTopologyMap() {
+ return clusterMap;
+ }
+
public void setNetworkTopology(NetworkTopology topology) {
this.clusterMap = topology;
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
new file mode 100644
index 0000000..2e0d0b1
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.net.*;
+import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * Test for PipelinePlacementPolicy.
+ */
+public class TestPipelinePlacementPolicy {
+ private MockNodeManager nodeManager;
+ private PipelinePlacementPolicy placementPolicy;
+ private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
+
+ @Before
+ public void init() throws Exception {
+ nodeManager = new MockNodeManager(true,
+ PIPELINE_PLACEMENT_MAX_NODES_COUNT);
+ placementPolicy =
+ new PipelinePlacementPolicy(nodeManager, new OzoneConfiguration());
+ }
+
+ @Test
+ public void testChooseNodeBasedOnNetworkTopology() {
+ List<DatanodeDetails> healthyNodes =
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+ DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
+ // anchor should be removed from healthyNodes after being chosen.
+ Assert.assertFalse(healthyNodes.contains(anchor));
+
+ List<DatanodeDetails> excludedNodes =
+ new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT);
+ DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology(
+ nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes);
+ // excludedNodes should contain nextNode after being chosen.
+ Assert.assertTrue(excludedNodes.contains(nextNode));
+ // nextNode should not be the same as anchor.
+ Assert.assertTrue(anchor.getUuid() != nextNode.getUuid());
+ }
+
+ @Test
+ public void testChooseNodeBasedOnRackAwareness() {
+ List<DatanodeDetails> healthyNodes = overWriteLocationInNodes(
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY));
+ DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
+ NetworkTopology topologyWithDifRacks =
+ createNetworkTopologyOnDifRacks();
+ DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness(
+ healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+ topologyWithDifRacks, anchor);
+ Assert.assertFalse(topologyWithDifRacks.isSameParent(anchor, nextNode));
+ }
+
+ private final static Node[] NODES = new NodeImpl[] {
+ new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h2", "/r1", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h3", "/r1", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h4", "/r1", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h5", "/r2", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h6", "/r2", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h7", "/r2", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h8", "/r2", NetConstants.NODE_COST_DEFAULT),
+ };
+
+
+ private NetworkTopology createNetworkTopologyOnDifRacks() {
+ NetworkTopology topology = new NetworkTopologyImpl(new Configuration());
+ for (Node n : NODES) {
+ topology.add(n);
+ }
+ return topology;
+ }
+
+ private List<DatanodeDetails> overWriteLocationInNodes(
+ List<DatanodeDetails> datanodes) {
+ List<DatanodeDetails> results = new ArrayList<>(datanodes.size());
+ for (int i = 0; i < datanodes.size(); i++) {
+ DatanodeDetails datanode = datanodes.get(i);
+ DatanodeDetails result = DatanodeDetails.newBuilder()
+ .setUuid(datanode.getUuidString())
+ .setHostName(datanode.getHostName())
+ .setIpAddress(datanode.getIpAddress())
+ .addPort(datanode.getPort(DatanodeDetails.Port.Name.STANDALONE))
+ .addPort(datanode.getPort(DatanodeDetails.Port.Name.RATIS))
+ .addPort(datanode.getPort(DatanodeDetails.Port.Name.REST))
+ .setNetworkLocation(NODES[i].getNetworkLocation()).build();
+ results.add(result);
+ }
+ return results;
+ }
+
+ @Test
+ public void testHeavyNodeShouldBeExcluded() throws SCMException{
+ List<DatanodeDetails> healthyNodes =
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+ int nodesRequired = healthyNodes.size()/2;
+ // only minority of healthy NODES are heavily engaged in pipelines.
+ int minorityHeavy = healthyNodes.size()/2 - 1;
+ List<DatanodeDetails> pickedNodes1 = placementPolicy.chooseDatanodes(
+ new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+ new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+ nodesRequired, 0);
+ // modify node to pipeline mapping.
+ insertHeavyNodesIntoNodeManager(healthyNodes, minorityHeavy);
+ // NODES should be sufficient.
+ Assert.assertEquals(nodesRequired, pickedNodes1.size());
+ // make sure pipeline placement policy won't select duplicated NODES.
+ Assert.assertTrue(checkDuplicateNodesUUID(pickedNodes1));
+
+ // majority of healthy NODES are heavily engaged in pipelines.
+ int majorityHeavy = healthyNodes.size()/2 + 2;
+ insertHeavyNodesIntoNodeManager(healthyNodes, majorityHeavy);
+ boolean thrown = false;
+ List<DatanodeDetails> pickedNodes2 = null;
+ try {
+ pickedNodes2 = placementPolicy.chooseDatanodes(
+ new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+ new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+ nodesRequired, 0);
+ } catch (SCMException e) {
+ Assert.assertFalse(thrown);
+ thrown = true;
+ }
+ // NODES should NOT be sufficient and exception should be thrown.
+ Assert.assertNull(pickedNodes2);
+ Assert.assertTrue(thrown);
+ }
+
+ private boolean checkDuplicateNodesUUID(List<DatanodeDetails> nodes) {
+ HashSet<UUID> uuids = nodes.stream().
+ map(DatanodeDetails::getUuid).
+ collect(Collectors.toCollection(HashSet::new));
+ return uuids.size() == nodes.size();
+ }
+
+ private Set<PipelineID> mockPipelineIDs(int count) {
+ Set<PipelineID> pipelineIDs = new HashSet<>(count);
+ for (int i = 0; i < count; i++) {
+ pipelineIDs.add(PipelineID.randomId());
+ }
+ return pipelineIDs;
+ }
+
+ private void insertHeavyNodesIntoNodeManager(
+ List<DatanodeDetails> nodes, int heavyNodeCount) throws SCMException{
+ if (nodes == null) {
+ throw new SCMException("",
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+
+ int considerHeavyCount =
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT + 1;
+
+ Node2PipelineMap mockMap = new Node2PipelineMap();
+ for (DatanodeDetails node : nodes) {
+ // mock heavy node
+ if (heavyNodeCount > 0) {
+ mockMap.insertNewDatanode(
+ node.getUuid(), mockPipelineIDs(considerHeavyCount));
+ heavyNodeCount--;
+ } else {
+ mockMap.insertNewDatanode(node.getUuid(), mockPipelineIDs(1));
+ }
+ }
+ nodeManager.setNode2PipelineMap(mockMap);
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 0ecff3f..7e8ec52 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -20,6 +20,7 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
@@ -166,6 +167,16 @@ public class ReplicationNodeManagerMock implements NodeManager {
}
/**
+ * Get the count of pipelines a datanodes is associated with.
+ * @param dnId DatanodeDetails
+ * @return The number of pipelines
+ */
+ @Override
+ public int getPipelinesCount(DatanodeDetails dnId) {
+ throw new UnsupportedOperationException("Not yet implemented");
+ }
+
+ /**
* Add pipeline information in the NodeManager.
* @param pipeline - Pipeline to be added
*/
@@ -327,4 +338,9 @@ public class ReplicationNodeManagerMock implements NodeManager {
public List<DatanodeDetails> getNodesByAddress(String address) {
return new LinkedList<>();
}
+
+ @Override
+ public NetworkTopology getClusterNetworkTopologyMap() {
+ return null;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org