You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2020/03/27 07:30:09 UTC
[hadoop-ozone] branch master updated: HDDS-3179. Pipeline placement
based on Topology does not have fallback (#678)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 7d132ce HDDS-3179. Pipeline placement based on Topology does not have fallback (#678)
7d132ce is described below
commit 7d132ce38d5d8aeb3b72e770f99881888c2753ee
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Fri Mar 27 15:29:59 2020 +0800
HDDS-3179. Pipeline placement based on Topology does not have fallback (#678)
---
.../hadoop/hdds/protocol/DatanodeDetails.java | 2 +-
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 37 ++++--
.../hadoop/hdds/scm/container/MockNodeManager.java | 10 +-
.../scm/pipeline/TestPipelinePlacementPolicy.java | 145 ++++++++++++++++-----
4 files changed, 145 insertions(+), 49 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index a235a4b..28ed36d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -70,7 +70,7 @@ public class DatanodeDetails extends NodeImpl implements
this.certSerialId = certSerialId;
}
- protected DatanodeDetails(DatanodeDetails datanodeDetails) {
+ public DatanodeDetails(DatanodeDetails datanodeDetails) {
super(datanodeDetails.getHostName(), datanodeDetails.getNetworkLocation(),
datanodeDetails.getCost());
this.uuid = datanodeDetails.uuid;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 0f30449..e96b120 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -99,9 +99,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
try {
pipeline = stateManager.getPipeline(pid);
} catch (PipelineNotFoundException e) {
- LOG.error("Pipeline not found in pipeline state manager during" +
- " pipeline creation. PipelineID: " + pid +
- " exception: " + e.getMessage());
+ LOG.debug("Pipeline not found in pipeline state manager during" +
+ " pipeline creation. PipelineID: {}", pid, e);
continue;
}
if (pipeline != null &&
@@ -282,26 +281,32 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
LOG.debug("Second node chosen: {}", nextNode);
}
} else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Pipeline Placement: Unable to find 2nd node on different " +
- "rack based on rack awareness.");
- }
+ LOG.debug("Pipeline Placement: Unable to find 2nd node on different " +
+ "rack based on rack awareness. anchor: {}", anchor);
}
// Then choose nodes close to anchor based on network topology
int nodesToFind = nodesRequired - results.size();
for (int x = 0; x < nodesToFind; x++) {
// Pick remaining nodes based on the existence of rack awareness.
- DatanodeDetails pick = rackAwareness
- ? chooseNodeFromNetworkTopology(
- nodeManager.getClusterNetworkTopologyMap(), anchor, exclude)
- : fallBackPickNodes(healthyNodes, exclude);
+ DatanodeDetails pick = null;
+ if (rackAwareness) {
+ pick = chooseNodeFromNetworkTopology(
+ nodeManager.getClusterNetworkTopologyMap(), anchor, exclude);
+ }
+ // fall back protection
+ if (pick == null) {
+ pick = fallBackPickNodes(healthyNodes, exclude);
+ if (rackAwareness) {
+ LOG.debug("Failed to choose node based on topology. Fallback " +
+ "picks node as: {}", pick);
+ }
+ }
+
if (pick != null) {
results.add(pick);
exclude.add(pick);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Remaining node chosen: {}", pick);
- }
+ LOG.debug("Remaining node chosen: {}", pick);
}
}
@@ -414,6 +419,10 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
Node pick = networkTopology.chooseRandom(
anchor.getNetworkLocation(), excluded);
DatanodeDetails pickedNode = (DatanodeDetails) pick;
+ if (pickedNode == null) {
+ LOG.debug("Pick node is null, excluded nodes {}, anchor {}.",
+ excluded, anchor);
+ }
return pickedNode;
}
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index cbeef7f..f15bfdd 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -94,6 +94,7 @@ public class MockNodeManager implements NodeManager {
private ConcurrentMap<String, Set<String>> dnsToUuidMap;
public MockNodeManager(NetworkTopologyImpl clusterMap,
+ List<DatanodeDetails> nodes,
boolean initializeFakeNodes, int nodeCount) {
this.healthyNodes = new LinkedList<>();
this.staleNodes = new LinkedList<>();
@@ -104,6 +105,13 @@ public class MockNodeManager implements NodeManager {
this.dnsToUuidMap = new ConcurrentHashMap<>();
this.aggregateStat = new SCMNodeStat();
this.clusterMap = clusterMap;
+ if (!nodes.isEmpty()) {
+ for (int x = 0; x < nodes.size(); x++) {
+ DatanodeDetails node = nodes.get(x);
+ register(node, null, null);
+ populateNodeMetric(node, x);
+ }
+ }
if (initializeFakeNodes) {
for (int x = 0; x < nodeCount; x++) {
DatanodeDetails dd = MockDatanodeDetails.randomDatanodeDetails();
@@ -116,7 +124,7 @@ public class MockNodeManager implements NodeManager {
}
public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
- this(new NetworkTopologyImpl(new OzoneConfiguration()),
+ this(new NetworkTopologyImpl(new OzoneConfiguration()), new ArrayList<>(),
initializeFakeNodes, nodeCount);
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index daad808..fafc4b0 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -35,6 +36,9 @@ import java.util.*;
import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
+import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
/**
* Test for PipelinePlacementPolicy.
@@ -43,25 +47,55 @@ public class TestPipelinePlacementPolicy {
private MockNodeManager nodeManager;
private OzoneConfiguration conf;
private PipelinePlacementPolicy placementPolicy;
+ private NetworkTopologyImpl cluster;
private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
+ private List<DatanodeDetails> nodesWithOutRackAwareness = new ArrayList<>();
+ private List<DatanodeDetails> nodesWithRackAwareness = new ArrayList<>();
+
@Before
public void init() throws Exception {
- nodeManager = new MockNodeManager(true,
- PIPELINE_PLACEMENT_MAX_NODES_COUNT);
+ cluster = initTopology();
+ // start with nodes with rack awareness.
+ nodeManager = new MockNodeManager(cluster, getNodesWithRackAwareness(),
+ false, PIPELINE_PLACEMENT_MAX_NODES_COUNT);
conf = new OzoneConfiguration();
conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 5);
placementPolicy = new PipelinePlacementPolicy(
nodeManager, new PipelineStateManager(), conf);
}
+ private NetworkTopologyImpl initTopology() {
+ NodeSchema[] schemas = new NodeSchema[]
+ {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
+ NodeSchemaManager.getInstance().init(schemas, true);
+ NetworkTopologyImpl topology =
+ new NetworkTopologyImpl(NodeSchemaManager.getInstance());
+ return topology;
+ }
+
+ private List<DatanodeDetails> getNodesWithRackAwareness() {
+ List<DatanodeDetails> datanodes = new ArrayList<>();
+ for (Node node : NODES) {
+ DatanodeDetails datanode = overwriteLocationInNode(
+ getNodesWithoutRackAwareness(), node);
+ nodesWithRackAwareness.add(datanode);
+ datanodes.add(datanode);
+ }
+ return datanodes;
+ }
+
+ private DatanodeDetails getNodesWithoutRackAwareness() {
+ DatanodeDetails node = MockDatanodeDetails.randomDatanodeDetails();
+ nodesWithOutRackAwareness.add(node);
+ return node;
+ }
+
@Test
- public void testChooseNodeBasedOnNetworkTopology() {
- List<DatanodeDetails> healthyNodes =
- nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
- DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
+ public void testChooseNodeBasedOnNetworkTopology() throws SCMException {
+ DatanodeDetails anchor = placementPolicy.chooseNode(nodesWithRackAwareness);
// anchor should be removed from healthyNodes after being chosen.
- Assert.assertFalse(healthyNodes.contains(anchor));
+ Assert.assertFalse(nodesWithRackAwareness.contains(anchor));
List<DatanodeDetails> excludedNodes =
new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT);
@@ -69,11 +103,43 @@ public class TestPipelinePlacementPolicy {
DatanodeDetails nextNode = placementPolicy.chooseNodeFromNetworkTopology(
nodeManager.getClusterNetworkTopologyMap(), anchor, excludedNodes);
Assert.assertFalse(excludedNodes.contains(nextNode));
- // nextNode should not be the same as anchor.
+ // next node should not be the same as anchor.
Assert.assertTrue(anchor.getUuid() != nextNode.getUuid());
+ // next node should be on the same rack based on topology.
+ Assert.assertEquals(anchor.getNetworkLocation(),
+ nextNode.getNetworkLocation());
}
@Test
+ public void testChooseNodeWithSingleNodeRack() throws SCMException {
+ // There is only one node on 3 racks altogether.
+ List<DatanodeDetails> datanodes = new ArrayList<>();
+ for (Node node : SINGLE_NODE_RACK) {
+ DatanodeDetails datanode = overwriteLocationInNode(
+ MockDatanodeDetails.randomDatanodeDetails(), node);
+ datanodes.add(datanode);
+ }
+ MockNodeManager localNodeManager = new MockNodeManager(initTopology(),
+ datanodes, false, datanodes.size());
+ PipelinePlacementPolicy localPlacementPolicy = new PipelinePlacementPolicy(
+ localNodeManager, new PipelineStateManager(), conf);
+ int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
+ List<DatanodeDetails> results = localPlacementPolicy.chooseDatanodes(
+ new ArrayList<>(datanodes.size()),
+ new ArrayList<>(datanodes.size()),
+ nodesRequired, 0);
+
+ Assert.assertEquals(nodesRequired, results.size());
+ // 3 nodes should be on different racks.
+ Assert.assertNotEquals(results.get(0).getNetworkLocation(),
+ results.get(1).getNetworkLocation());
+ Assert.assertNotEquals(results.get(0).getNetworkLocation(),
+ results.get(2).getNetworkLocation());
+ Assert.assertNotEquals(results.get(1).getNetworkLocation(),
+ results.get(2).getNetworkLocation());
+ }
+
+ @Test
public void testChooseNodeBasedOnRackAwareness() {
List<DatanodeDetails> healthyNodes = overWriteLocationInNodes(
nodeManager.getNodes(HddsProtos.NodeState.HEALTHY));
@@ -84,8 +150,9 @@ public class TestPipelinePlacementPolicy {
healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
topologyWithDifRacks, anchor);
Assert.assertNotNull(nextNode);
- Assert.assertFalse(anchor.getNetworkLocation().equals(
- nextNode.getNetworkLocation()));
+ // next node should be on a different rack.
+ Assert.assertNotEquals(anchor.getNetworkLocation(),
+ nextNode.getNetworkLocation());
}
@Test
@@ -115,25 +182,25 @@ public class TestPipelinePlacementPolicy {
@Test
public void testRackAwarenessNotEnabledWithFallBack() throws SCMException{
- List<DatanodeDetails> healthyNodes =
- nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
- DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
- DatanodeDetails randomNode = placementPolicy.chooseNode(healthyNodes);
+ DatanodeDetails anchor = placementPolicy
+ .chooseNode(nodesWithOutRackAwareness);
+ DatanodeDetails randomNode = placementPolicy
+ .chooseNode(nodesWithOutRackAwareness);
// rack awareness is not enabled.
Assert.assertTrue(anchor.getNetworkLocation().equals(
randomNode.getNetworkLocation()));
NetworkTopology topology = new NetworkTopologyImpl(new Configuration());
DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness(
- healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
- topology, anchor);
+ nodesWithOutRackAwareness, new ArrayList<>(
+ PIPELINE_PLACEMENT_MAX_NODES_COUNT), topology, anchor);
// RackAwareness should not be able to choose any node.
Assert.assertNull(nextNode);
// PlacementPolicy should still be able to pick a set of 3 nodes.
int numOfNodes = HddsProtos.ReplicationFactor.THREE.getNumber();
List<DatanodeDetails> results = placementPolicy
- .getResultSet(numOfNodes, healthyNodes);
+ .getResultSet(numOfNodes, nodesWithOutRackAwareness);
Assert.assertEquals(numOfNodes, results.size());
// All nodes are on same rack.
@@ -146,14 +213,20 @@ public class TestPipelinePlacementPolicy {
private final static Node[] NODES = new NodeImpl[] {
new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h2", "/r1", NetConstants.NODE_COST_DEFAULT),
- new NodeImpl("h3", "/r1", NetConstants.NODE_COST_DEFAULT),
- new NodeImpl("h4", "/r1", NetConstants.NODE_COST_DEFAULT),
- new NodeImpl("h5", "/r2", NetConstants.NODE_COST_DEFAULT),
- new NodeImpl("h6", "/r2", NetConstants.NODE_COST_DEFAULT),
- new NodeImpl("h7", "/r2", NetConstants.NODE_COST_DEFAULT),
- new NodeImpl("h8", "/r2", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h3", "/r2", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h4", "/r2", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h5", "/r3", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h6", "/r3", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h7", "/r4", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h8", "/r4", NetConstants.NODE_COST_DEFAULT),
};
+ // 3 racks with single node.
+ private final static Node[] SINGLE_NODE_RACK = new NodeImpl[] {
+ new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h2", "/r2", NetConstants.NODE_COST_DEFAULT),
+ new NodeImpl("h3", "/r3", NetConstants.NODE_COST_DEFAULT)
+ };
private NetworkTopology createNetworkTopologyOnDifRacks() {
NetworkTopology topology = new NetworkTopologyImpl(new Configuration());
@@ -163,20 +236,26 @@ public class TestPipelinePlacementPolicy {
return topology;
}
+ private DatanodeDetails overwriteLocationInNode(
+ DatanodeDetails datanode, Node node) {
+ DatanodeDetails result = DatanodeDetails.newBuilder()
+ .setUuid(datanode.getUuidString())
+ .setHostName(datanode.getHostName())
+ .setIpAddress(datanode.getIpAddress())
+ .addPort(datanode.getPort(DatanodeDetails.Port.Name.STANDALONE))
+ .addPort(datanode.getPort(DatanodeDetails.Port.Name.RATIS))
+ .addPort(datanode.getPort(DatanodeDetails.Port.Name.REST))
+ .setNetworkLocation(node.getNetworkLocation()).build();
+ return result;
+ }
+
private List<DatanodeDetails> overWriteLocationInNodes(
List<DatanodeDetails> datanodes) {
List<DatanodeDetails> results = new ArrayList<>(datanodes.size());
for (int i = 0; i < datanodes.size(); i++) {
- DatanodeDetails datanode = datanodes.get(i);
- DatanodeDetails result = DatanodeDetails.newBuilder()
- .setUuid(datanode.getUuidString())
- .setHostName(datanode.getHostName())
- .setIpAddress(datanode.getIpAddress())
- .addPort(datanode.getPort(DatanodeDetails.Port.Name.STANDALONE))
- .addPort(datanode.getPort(DatanodeDetails.Port.Name.RATIS))
- .addPort(datanode.getPort(DatanodeDetails.Port.Name.REST))
- .setNetworkLocation(NODES[i].getNetworkLocation()).build();
- results.add(result);
+ DatanodeDetails datanode = overwriteLocationInNode(
+ datanodes.get(i), NODES[i]);
+ results.add(datanode);
}
return results;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org