You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2020/02/10 01:14:07 UTC
[hadoop-ozone] branch HDDS-1564 updated: HDDS-2923 Add fall-back
protection for rack awareness in pipeline creation. (#516)
This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-1564 by this push:
new 43845da HDDS-2923 Add fall-back protection for rack awareness in pipeline creation. (#516)
43845da is described below
commit 43845daa30822c9fb259acaafc1748d633e76fc6
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Mon Feb 10 09:13:57 2020 +0800
HDDS-2923 Add fall-back protection for rack awareness in pipeline creation. (#516)
---
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 81 ++++++++++++++++------
.../scm/pipeline/TestPipelinePlacementPolicy.java | 56 +++++++++++++++
2 files changed, 114 insertions(+), 23 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 9d78063..0f30449 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -208,6 +208,29 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
}
}
+ // Fall back logic for node pick up.
+ DatanodeDetails fallBackPickNodes(
+ List<DatanodeDetails> nodeSet, List<DatanodeDetails> excludedNodes)
+ throws SCMException{
+ DatanodeDetails node;
+ if (excludedNodes == null || excludedNodes.isEmpty()) {
+ node = chooseNode(nodeSet);
+ } else {
+ List<DatanodeDetails> inputNodes = nodeSet.stream()
+ .filter(p -> !excludedNodes.contains(p)).collect(Collectors.toList());
+ node = chooseNode(inputNodes);
+ }
+
+ if (node == null) {
+ String msg = String.format("Unable to find fall back node in" +
+ " pipeline allocation. nodeSet size: {}", nodeSet.size());
+ LOG.warn(msg);
+ throw new SCMException(msg,
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+ return node;
+ }
+
/**
* Get result set based on the pipeline placement algorithm which considers
* network topology and rack awareness.
@@ -220,50 +243,59 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
public List<DatanodeDetails> getResultSet(
int nodesRequired, List<DatanodeDetails> healthyNodes)
throws SCMException {
+ if (nodesRequired != HddsProtos.ReplicationFactor.THREE.getNumber()) {
+ throw new SCMException("Nodes required number is not supported: " +
+ nodesRequired, SCMException.ResultCodes.INVALID_CAPACITY);
+ }
+
+ // Assume rack awareness is not enabled.
+ boolean rackAwareness = false;
List <DatanodeDetails> results = new ArrayList<>(nodesRequired);
// Since nodes are widely distributed, the results should be selected
// base on distance in topology, rack awareness and load balancing.
List<DatanodeDetails> exclude = new ArrayList<>();
// First choose an anchor nodes randomly
DatanodeDetails anchor = chooseNode(healthyNodes);
- if (anchor == null) {
- LOG.warn("Unable to find healthy node for anchor(first) node." +
- " Required nodes: {}, Found nodes: {}",
- nodesRequired, results.size());
- throw new SCMException("Unable to find required number of nodes.",
+ if (anchor != null) {
+ results.add(anchor);
+ exclude.add(anchor);
+ } else {
+ LOG.warn("Unable to find healthy node for anchor(first) node.");
+ throw new SCMException("Unable to find anchor node.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
if (LOG.isDebugEnabled()) {
LOG.debug("First node chosen: {}", anchor);
}
- results.add(anchor);
- exclude.add(anchor);
// Choose the second node on different racks from anchor.
- DatanodeDetails nodeOnDifferentRack = chooseNodeBasedOnRackAwareness(
+ DatanodeDetails nextNode = chooseNodeBasedOnRackAwareness(
healthyNodes, exclude,
nodeManager.getClusterNetworkTopologyMap(), anchor);
- if (nodeOnDifferentRack == null) {
- LOG.warn("Pipeline Placement: Unable to find 2nd node on different " +
- "racks that meets the criteria. Required nodes: {}, Found nodes:" +
- " {}", nodesRequired, results.size());
- throw new SCMException("Unable to find required number of nodes.",
- SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("Second node chosen: {}", nodeOnDifferentRack);
+ if (nextNode != null) {
+ // Rack awareness is detected.
+ rackAwareness = true;
+ results.add(nextNode);
+ exclude.add(nextNode);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Second node chosen: {}", nextNode);
+ }
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Pipeline Placement: Unable to find 2nd node on different " +
+ "rack based on rack awareness.");
+ }
}
- results.add(nodeOnDifferentRack);
- exclude.add(nodeOnDifferentRack);
-
// Then choose nodes close to anchor based on network topology
int nodesToFind = nodesRequired - results.size();
for (int x = 0; x < nodesToFind; x++) {
- // invoke the choose function defined in the derived classes.
- DatanodeDetails pick = chooseNodeFromNetworkTopology(
- nodeManager.getClusterNetworkTopologyMap(), anchor, exclude);
+ // Pick remaining nodes based on the existence of rack awareness.
+ DatanodeDetails pick = rackAwareness
+ ? chooseNodeFromNetworkTopology(
+ nodeManager.getClusterNetworkTopologyMap(), anchor, exclude)
+ : fallBackPickNodes(healthyNodes, exclude);
if (pick != null) {
results.add(pick);
exclude.add(pick);
@@ -293,6 +325,9 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
@Override
public DatanodeDetails chooseNode(
List<DatanodeDetails> healthyNodes) {
+ if (healthyNodes == null || healthyNodes.isEmpty()) {
+ return null;
+ }
int firstNodeNdx = getRand().nextInt(healthyNodes.size());
int secondNodeNdx = getRand().nextInt(healthyNodes.size());
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index b9aa9af..daad808 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -83,10 +83,66 @@ public class TestPipelinePlacementPolicy {
DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness(
healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
topologyWithDifRacks, anchor);
+ Assert.assertNotNull(nextNode);
Assert.assertFalse(anchor.getNetworkLocation().equals(
nextNode.getNetworkLocation()));
}
+ @Test
+ public void testFallBackPickNodes() {
+ List<DatanodeDetails> healthyNodes = overWriteLocationInNodes(
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY));
+ DatanodeDetails node;
+ try {
+ node = placementPolicy.fallBackPickNodes(healthyNodes, null);
+ Assert.assertNotNull(node);
+ } catch (SCMException e) {
+ Assert.fail("Should not reach here.");
+ }
+
+ // when input nodeSet are all excluded.
+ List<DatanodeDetails> exclude = healthyNodes;
+ try {
+ node = placementPolicy.fallBackPickNodes(healthyNodes, exclude);
+ Assert.assertNull(node);
+ } catch (SCMException e) {
+ Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
+ e.getResult());
+ } catch (Exception ex) {
+ Assert.fail("Should not reach here.");
+ }
+ }
+
+ @Test
+ public void testRackAwarenessNotEnabledWithFallBack() throws SCMException{
+ List<DatanodeDetails> healthyNodes =
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+ DatanodeDetails anchor = placementPolicy.chooseNode(healthyNodes);
+ DatanodeDetails randomNode = placementPolicy.chooseNode(healthyNodes);
+ // rack awareness is not enabled.
+ Assert.assertTrue(anchor.getNetworkLocation().equals(
+ randomNode.getNetworkLocation()));
+
+ NetworkTopology topology = new NetworkTopologyImpl(new Configuration());
+ DatanodeDetails nextNode = placementPolicy.chooseNodeBasedOnRackAwareness(
+ healthyNodes, new ArrayList<>(PIPELINE_PLACEMENT_MAX_NODES_COUNT),
+ topology, anchor);
+ // RackAwareness should not be able to choose any node.
+ Assert.assertNull(nextNode);
+
+ // PlacementPolicy should still be able to pick a set of 3 nodes.
+ int numOfNodes = HddsProtos.ReplicationFactor.THREE.getNumber();
+ List<DatanodeDetails> results = placementPolicy
+ .getResultSet(numOfNodes, healthyNodes);
+
+ Assert.assertEquals(numOfNodes, results.size());
+ // All nodes are on same rack.
+ Assert.assertEquals(results.get(0).getNetworkLocation(),
+ results.get(1).getNetworkLocation());
+ Assert.assertEquals(results.get(0).getNetworkLocation(),
+ results.get(2).getNetworkLocation());
+ }
+
private final static Node[] NODES = new NodeImpl[] {
new NodeImpl("h1", "/r1", NetConstants.NODE_COST_DEFAULT),
new NodeImpl("h2", "/r1", NetConstants.NODE_COST_DEFAULT),
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org