You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2020/04/21 15:15:44 UTC
[hadoop-ozone] branch master updated: HDDS-3139. Pipeline placement
should max out pipeline usage (#668)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new fac26e1 HDDS-3139. Pipeline placement should max out pipeline usage (#668)
fac26e1 is described below
commit fac26e134b6ec6c0cc85e1dc867d66aea82c0b8a
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Tue Apr 21 23:15:32 2020 +0800
HDDS-3139. Pipeline placement should max out pipeline usage (#668)
---
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 134 +++++++++------------
.../scm/pipeline/TestPipelinePlacementPolicy.java | 76 ++++++++++--
2 files changed, 124 insertions(+), 86 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index b6a6858..68fe65d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -25,16 +25,14 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.SCMCommonPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
-import org.apache.hadoop.hdds.scm.net.Node;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -76,19 +74,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
}
- /**
- * Returns true if this node meets the criteria.
- *
- * @param datanodeDetails DatanodeDetails
- * @param nodesRequired nodes required count
- * @return true if we have enough space.
- */
- @VisibleForTesting
- boolean meetCriteria(DatanodeDetails datanodeDetails, int nodesRequired) {
- if (heavyNodeCriteria == 0) {
- // no limit applied.
- return true;
- }
+ int currentPipelineCount(DatanodeDetails datanodeDetails, int nodesRequired) {
+
// Datanodes from pipeline in some states can also be considered available
// for pipeline allocation. Thus the number of these pipeline shall be
// deducted from total heaviness calculation.
@@ -110,21 +97,16 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
pipelineNumDeductable++;
}
}
- boolean meet = (nodeManager.getPipelinesCount(datanodeDetails)
- - pipelineNumDeductable) < heavyNodeCriteria;
- if (!meet && LOG.isDebugEnabled()) {
- LOG.debug("Pipeline Placement: can't place more pipeline on heavy " +
- "datanodeļ¼ " + datanodeDetails.getUuid().toString() +
- " Heaviness: " + nodeManager.getPipelinesCount(datanodeDetails) +
- " limit: " + heavyNodeCriteria);
- }
- return meet;
+ return pipelines.size() - pipelineNumDeductable;
}
+
+
/**
* Filter out viable nodes based on
* 1. nodes that are healthy
* 2. nodes that are not too heavily engaged in other pipelines
+ * The results are sorted based on pipeline count of each node.
*
* @param excludedNodes - excluded nodes
* @param nodesRequired - number of datanodes required.
@@ -154,8 +136,15 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
// filter nodes that meet the size and pipeline engagement criteria.
// Pipeline placement doesn't take node space left into account.
+ // Sort the DNs by pipeline load.
+ // TODO check if sorting could cause performance issue: HDDS-3466.
List<DatanodeDetails> healthyList = healthyNodes.stream()
- .filter(d -> meetCriteria(d, nodesRequired))
+ .map(d ->
+ new DnWithPipelines(d, currentPipelineCount(d, nodesRequired)))
+ .filter(d ->
+ ((d.getPipelines() < heavyNodeCriteria) || heavyNodeCriteria == 0))
+ .sorted(Comparator.comparingInt(DnWithPipelines::getPipelines))
+ .map(d -> d.getDn())
.collect(Collectors.toList());
if (healthyList.size() < nodesRequired) {
@@ -253,7 +242,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
// Since nodes are widely distributed, the results should be selected
// base on distance in topology, rack awareness and load balancing.
List<DatanodeDetails> exclude = new ArrayList<>();
- // First choose an anchor nodes randomly
+ // First choose an anchor node.
DatanodeDetails anchor = chooseNode(healthyNodes);
if (anchor != null) {
results.add(anchor);
@@ -291,8 +280,9 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
// Pick remaining nodes based on the existence of rack awareness.
DatanodeDetails pick = null;
if (rackAwareness) {
- pick = chooseNodeFromNetworkTopology(
- nodeManager.getClusterNetworkTopologyMap(), anchor, exclude);
+ pick = chooseNodeBasedOnSameRack(
+ healthyNodes, exclude,
+ nodeManager.getClusterNetworkTopologyMap(), anchor);
}
// fall back protection
if (pick == null) {
@@ -333,24 +323,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
if (healthyNodes == null || healthyNodes.isEmpty()) {
return null;
}
- int firstNodeNdx = getRand().nextInt(healthyNodes.size());
- int secondNodeNdx = getRand().nextInt(healthyNodes.size());
-
- DatanodeDetails datanodeDetails;
- // There is a possibility that both numbers will be same.
- // if that is so, we just return the node.
- if (firstNodeNdx == secondNodeNdx) {
- datanodeDetails = healthyNodes.get(firstNodeNdx);
- } else {
- DatanodeDetails firstNodeDetails = healthyNodes.get(firstNodeNdx);
- DatanodeDetails secondNodeDetails = healthyNodes.get(secondNodeNdx);
- SCMNodeMetric firstNodeMetric =
- nodeManager.getNodeStat(firstNodeDetails);
- SCMNodeMetric secondNodeMetric =
- nodeManager.getNodeStat(secondNodeDetails);
- datanodeDetails = firstNodeMetric.isGreater(secondNodeMetric.get())
- ? firstNodeDetails : secondNodeDetails;
- }
+ DatanodeDetails datanodeDetails = healthyNodes.get(0);
healthyNodes.remove(datanodeDetails);
return datanodeDetails;
}
@@ -373,13 +346,31 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
return null;
}
- for (DatanodeDetails node : healthyNodes) {
- if (excludedNodes.contains(node) ||
- anchor.getNetworkLocation().equals(node.getNetworkLocation())) {
- continue;
- } else {
- return node;
- }
+ List<DatanodeDetails> nodesOnOtherRack = healthyNodes.stream().filter(
+ p -> !excludedNodes.contains(p)
+ && !anchor.getNetworkLocation().equals(p.getNetworkLocation()))
+ .collect(Collectors.toList());
+ if (!nodesOnOtherRack.isEmpty()) {
+ return nodesOnOtherRack.get(0);
+ }
+ return null;
+ }
+
+ @VisibleForTesting
+ protected DatanodeDetails chooseNodeBasedOnSameRack(
+ List<DatanodeDetails> healthyNodes, List<DatanodeDetails> excludedNodes,
+ NetworkTopology networkTopology, DatanodeDetails anchor) {
+ Preconditions.checkArgument(networkTopology != null);
+ if (checkAllNodesAreEqual(networkTopology)) {
+ return null;
+ }
+
+ List<DatanodeDetails> nodesOnSameRack = healthyNodes.stream().filter(
+ p -> !excludedNodes.contains(p)
+ && anchor.getNetworkLocation().equals(p.getNetworkLocation()))
+ .collect(Collectors.toList());
+ if (!nodesOnSameRack.isEmpty()) {
+ return nodesOnSameRack.get(0);
}
return null;
}
@@ -398,31 +389,22 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
return (topology.getNumOfNodes(topology.getMaxLevel() - 1) == 1);
}
- /**
- * Choose node based on network topology.
- * @param networkTopology network topology
- * @param anchor anchor datanode to start with
- * @param excludedNodes excluded datanodes
- * @return chosen datanode
- */
- @VisibleForTesting
- protected DatanodeDetails chooseNodeFromNetworkTopology(
- NetworkTopology networkTopology, DatanodeDetails anchor,
- List<DatanodeDetails> excludedNodes) {
- Preconditions.checkArgument(networkTopology != null);
+ private static class DnWithPipelines {
+ private DatanodeDetails dn;
+ private int pipelines;
+
+ DnWithPipelines(DatanodeDetails dn, int pipelines) {
+ this.dn = dn;
+ this.pipelines = pipelines;
+ }
- Collection<Node> excluded = new ArrayList<>();
- if (excludedNodes != null && excludedNodes.size() != 0) {
- excluded.addAll(excludedNodes);
+ public int getPipelines() {
+ return pipelines;
}
- Node pick = networkTopology.chooseRandom(
- anchor.getNetworkLocation(), excluded);
- DatanodeDetails pickedNode = (DatanodeDetails) pick;
- if (pickedNode == null) {
- LOG.debug("Pick node is null, excluded nodes {}, anchor {}.",
- excluded, anchor);
+ public DatanodeDetails getDn() {
+ return dn;
}
- return pickedNode;
}
+
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index b8b8622..426a643 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -41,27 +41,37 @@ import org.apache.hadoop.hdds.scm.net.NodeSchema;
import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
/**
* Test for PipelinePlacementPolicy.
*/
public class TestPipelinePlacementPolicy {
private MockNodeManager nodeManager;
+ private PipelineStateManager stateManager;
private OzoneConfiguration conf;
private PipelinePlacementPolicy placementPolicy;
private NetworkTopologyImpl cluster;
private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
+ private static final int PIPELINE_LOAD_LIMIT = 5;
private List<DatanodeDetails> nodesWithOutRackAwareness = new ArrayList<>();
private List<DatanodeDetails> nodesWithRackAwareness = new ArrayList<>();
+ static final Logger LOG =
+ LoggerFactory.getLogger(TestPipelinePlacementPolicy.class);
+
@Before
public void init() throws Exception {
cluster = initTopology();
@@ -69,9 +79,10 @@ public class TestPipelinePlacementPolicy {
nodeManager = new MockNodeManager(cluster, getNodesWithRackAwareness(),
false, PIPELINE_PLACEMENT_MAX_NODES_COUNT);
conf = new OzoneConfiguration();
- conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 5);
+ conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, PIPELINE_LOAD_LIMIT);
+ stateManager = new PipelineStateManager();
placementPolicy = new PipelinePlacementPolicy(
- nodeManager, new PipelineStateManager(), conf);
+ nodeManager, stateManager, conf);
}
private NetworkTopologyImpl initTopology() {
@@ -85,11 +96,14 @@ public class TestPipelinePlacementPolicy {
private List<DatanodeDetails> getNodesWithRackAwareness() {
List<DatanodeDetails> datanodes = new ArrayList<>();
- for (Node node : NODES) {
+ int iter = 0;
+ int delimiter = NODES.length;
+ while (iter < PIPELINE_PLACEMENT_MAX_NODES_COUNT) {
DatanodeDetails datanode = overwriteLocationInNode(
- getNodesWithoutRackAwareness(), node);
+ getNodesWithoutRackAwareness(), NODES[iter % delimiter]);
nodesWithRackAwareness.add(datanode);
datanodes.add(datanode);
+ iter++;
}
return datanodes;
}
@@ -101,7 +115,7 @@ public class TestPipelinePlacementPolicy {
}
@Test
- public void testChooseNodeBasedOnNetworkTopology() throws SCMException {
+ public void testChooseNodeBasedOnNetworkTopology() {
DatanodeDetails anchor = placementPolicy.chooseNode(nodesWithRackAwareness);
// anchor should be removed from healthyNodes after being chosen.
Assert.assertFalse(nodesWithRackAwareness.contains(anchor));
@@ -109,8 +123,11 @@ public class TestPipelinePlacementPolicy {
List<DatanodeDetails> excludedNodes =
new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT);
excludedNodes.add(anchor);
- DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology(
- nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes);
+ DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnSameRack(
+ nodesWithRackAwareness, excludedNodes,
+ nodeManager.getClusterNetworkTopologyMap(), anchor);
+ //DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology(
+ // nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes);
Assert.assertFalse(excludedNodes.contains(nextNode));
// next node should not be the same as anchor.
Assert.assertTrue(anchor.getUuid() != nextNode.getUuid());
@@ -149,6 +166,45 @@ public class TestPipelinePlacementPolicy {
}
@Test
+ public void testPickLowestLoadAnchor() throws IOException{
+ List<DatanodeDetails> healthyNodes = nodeManager
+ .getNodes(HddsProtos.NodeState.HEALTHY);
+
+ int maxPipelineCount = PIPELINE_LOAD_LIMIT * healthyNodes.size()
+ / HddsProtos.ReplicationFactor.THREE.getNumber();
+ for (int i = 0; i < maxPipelineCount; i++) {
+ try {
+ List<DatanodeDetails> nodes = placementPolicy.chooseDatanodes(null,
+ null, HddsProtos.ReplicationFactor.THREE.getNumber(), 0);
+
+ Pipeline pipeline = Pipeline.newBuilder()
+ .setId(PipelineID.randomId())
+ .setState(Pipeline.PipelineState.ALLOCATED)
+ .setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(HddsProtos.ReplicationFactor.THREE)
+ .setNodes(nodes)
+ .build();
+ nodeManager.addPipeline(pipeline);
+ stateManager.addPipeline(pipeline);
+ } catch (SCMException e) {
+ break;
+ }
+ }
+
+ // Every node should be evenly used.
+ int averageLoadOnNode = maxPipelineCount *
+ HddsProtos.ReplicationFactor.THREE.getNumber() / healthyNodes.size();
+ for (DatanodeDetails node : healthyNodes) {
+ Assert.assertTrue(nodeManager.getPipelinesCount(node)
+ >= averageLoadOnNode);
+ }
+
+ // Should max out pipeline usage.
+ Assert.assertEquals(maxPipelineCount,
+ stateManager.getPipelines(HddsProtos.ReplicationType.RATIS).size());
+ }
+
+ @Test
public void testChooseNodeBasedOnRackAwareness() {
List<DatanodeDetails> healthyNodes = overWriteLocationInNodes(
nodeManager.getNodes(HddsProtos.NodeState.HEALTHY));
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org