You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2020/02/10 02:23:29 UTC
[hadoop-ozone] 11/18: HDDS-1574 Average out pipeline allocation on
datanodes and add metrcs/test (#291)
This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit d6b9ec0a5667322f2eb2c820953684c9f6ab1cc1
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Thu Dec 19 06:00:35 2019 +0800
HDDS-1574 Average out pipeline allocation on datanodes and add metrcs/test (#291)
---
.../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 19 +++
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 3 +-
.../hdds/scm/pipeline/PipelineStateManager.java | 7 ++
.../hdds/scm/pipeline/RatisPipelineProvider.java | 8 ++
.../hdds/scm/pipeline/RatisPipelineUtils.java | 38 +++++-
.../hdds/scm/pipeline/SCMPipelineManager.java | 12 ++
.../hdds/scm/pipeline/SCMPipelineMetrics.java | 9 ++
.../hadoop/hdds/scm/container/MockNodeManager.java | 14 ++-
.../container/TestCloseContainerEventHandler.java | 7 +-
.../scm/pipeline/MockRatisPipelineProvider.java | 4 +
.../TestPipelineDatanodesIntersection.java | 129 +++++++++++++++++++++
.../scm/pipeline/TestRatisPipelineProvider.java | 73 ++++++++++--
.../hdds/scm/pipeline/TestSCMPipelineManager.java | 15 ++-
.../testutils/ReplicationNodeManagerMock.java | 4 +-
14 files changed, 324 insertions(+), 18 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 295156d..1dc2373 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -58,6 +58,8 @@ public final class Pipeline {
private UUID leaderId;
// Timestamp for pipeline upon creation
private Long creationTimestamp;
+ // Only valid for Ratis THREE pipeline. No need persist.
+ private int nodeIdsHash;
/**
* The immutable properties of pipeline object is used in
@@ -73,6 +75,7 @@ public final class Pipeline {
this.state = state;
this.nodeStatus = nodeStatus;
this.creationTimestamp = System.currentTimeMillis();
+ this.nodeIdsHash = 0;
}
/**
@@ -129,6 +132,14 @@ public final class Pipeline {
this.creationTimestamp = creationTimestamp;
}
+ public int getNodeIdsHash() {
+ return nodeIdsHash;
+ }
+
+ void setNodeIdsHash(int nodeIdsHash) {
+ this.nodeIdsHash = nodeIdsHash;
+ }
+
/**
* Return the pipeline leader's UUID.
*
@@ -347,6 +358,7 @@ public final class Pipeline {
private List<DatanodeDetails> nodesInOrder = null;
private UUID leaderId = null;
private Long creationTimestamp = null;
+ private int nodeIdsHash = 0;
public Builder() {}
@@ -359,6 +371,7 @@ public final class Pipeline {
this.nodesInOrder = pipeline.nodesInOrder.get();
this.leaderId = pipeline.getLeaderId();
this.creationTimestamp = pipeline.getCreationTimestamp();
+ this.nodeIdsHash = 0;
}
public Builder setId(PipelineID id1) {
@@ -397,6 +410,11 @@ public final class Pipeline {
return this;
}
+ public Builder setNodeIdsHash(int nodeIdsHash1) {
+ this.nodeIdsHash = nodeIdsHash1;
+ return this;
+ }
+
public Pipeline build() {
Preconditions.checkNotNull(id);
Preconditions.checkNotNull(type);
@@ -405,6 +423,7 @@ public final class Pipeline {
Preconditions.checkNotNull(nodeStatus);
Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus);
pipeline.setLeaderId(leaderId);
+ pipeline.setNodeIdsHash(nodeIdsHash);
// overwrite with original creationTimestamp
if (creationTimestamp != null) {
pipeline.setCreationTimestamp(creationTimestamp);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 23eb574..bc65d14 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -162,7 +162,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
// filter nodes that meet the size and pipeline engagement criteria.
// Pipeline placement doesn't take node space left into account.
List<DatanodeDetails> healthyList = healthyNodes.stream()
- .filter(d -> meetCriteria(d, nodesRequired)).limit(nodesRequired)
+ .filter(d -> meetCriteria(d, nodesRequired))
.collect(Collectors.toList());
if (healthyList.size() < nodesRequired) {
@@ -308,6 +308,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
}
// the pick is decided and it should be removed from candidates.
healthyNodes.remove(datanodeDetails);
+
return datanodeDetails;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index bb56a03..051202b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -132,6 +132,13 @@ public class PipelineStateManager {
pipeline = pipelineStateMap
.updatePipelineState(pipelineId, PipelineState.OPEN);
}
+ // Amend nodeIdsHash if needed.
+ if (pipeline.getType() == ReplicationType.RATIS &&
+ pipeline.getFactor() == ReplicationFactor.THREE &&
+ pipeline.getNodeIdsHash() == 0) {
+ pipeline.setNodeIdsHash(RatisPipelineUtils
+ .encodeNodeIdsOfFactorThreePipeline(pipeline.getNodes()));
+ }
return pipeline;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 23b02ed..9585907 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -157,6 +157,7 @@ public class RatisPipelineProvider implements PipelineProvider {
}
List<DatanodeDetails> dns;
+ int nodeIdHash = 0;
switch(factor) {
case ONE:
@@ -165,6 +166,7 @@ public class RatisPipelineProvider implements PipelineProvider {
case THREE:
dns = placementPolicy.chooseDatanodes(null,
null, factor.getNumber(), 0);
+ nodeIdHash = RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(dns);
break;
default:
throw new IllegalStateException("Unknown factor: " + factor.name());
@@ -176,6 +178,7 @@ public class RatisPipelineProvider implements PipelineProvider {
.setType(ReplicationType.RATIS)
.setFactor(factor)
.setNodes(dns)
+ .setNodeIdsHash(nodeIdHash)
.build();
// Send command to datanodes to create pipeline
@@ -196,12 +199,17 @@ public class RatisPipelineProvider implements PipelineProvider {
@Override
public Pipeline create(ReplicationFactor factor,
List<DatanodeDetails> nodes) {
+ int nodeIdHash = 0;
+ if (factor == ReplicationFactor.THREE) {
+ nodeIdHash = RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes);
+ }
return Pipeline.newBuilder()
.setId(PipelineID.randomId())
.setState(PipelineState.ALLOCATED)
.setType(ReplicationType.RATIS)
.setFactor(factor)
.setNodes(nodes)
+ .setNodeIdsHash(nodeIdHash)
.build();
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index b8cdf06..f9f2011 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -18,9 +18,12 @@
package org.apache.hadoop.hdds.scm.pipeline;
import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.ratis.RatisHelper;
import org.apache.ratis.client.RaftClient;
@@ -33,7 +36,6 @@ import org.apache.ratis.rpc.SupportedRpcType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
/**
* Utility class for Ratis pipelines. Contains methods to create and destroy
* ratis pipelines.
@@ -93,4 +95,38 @@ public final class RatisPipelineUtils {
true, p.getId());
}
}
+
+ static int encodeNodeIdsOfFactorThreePipeline(List<DatanodeDetails> nodes) {
+ if (nodes.size() != HddsProtos.ReplicationFactor.THREE.getNumber()) {
+ return 0;
+ }
+ return nodes.get(0).getUuid().hashCode() ^
+ nodes.get(1).getUuid().hashCode() ^
+ nodes.get(2).getUuid().hashCode();
+ }
+
+ /**
+ * Return first existed pipeline which share the same set of datanodes
+ * with the input pipeline.
+ * @param stateManager PipelineStateManager
+ * @param pipeline input pipeline
+ * @return first matched pipeline
+ */
+ static Pipeline checkPipelineContainSameDatanodes(
+ PipelineStateManager stateManager, Pipeline pipeline) {
+ List<Pipeline> matchedPipelines = stateManager.getPipelines(
+ HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE)
+ .stream().filter(p -> !p.getId().equals(pipeline.getId()) &&
+ (// For all OPEN or ALLOCATED pipelines
+ p.getPipelineState() == Pipeline.PipelineState.OPEN ||
+ p.getPipelineState() == Pipeline.PipelineState.ALLOCATED) &&
+ p.getNodeIdsHash() == pipeline.getNodeIdsHash())
+ .collect(Collectors.toList());
+ if (matchedPipelines.size() == 0) {
+ return null;
+ } else {
+ return matchedPipelines.stream().findFirst().get();
+ }
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 01af465..11e9916 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -172,6 +172,18 @@ public class SCMPipelineManager implements PipelineManager {
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
}
+ Pipeline overlapPipeline = RatisPipelineUtils
+ .checkPipelineContainSameDatanodes(stateManager, pipeline);
+ if (overlapPipeline != null) {
+ metrics.incNumPipelineContainSameDatanodes();
+ //TODO remove until pipeline allocation is proved equally distributed.
+ LOG.info("Pipeline: " + pipeline.getId().toString() +
+ " contains same datanodes as previous pipeline: " +
+ overlapPipeline.getId().toString() + " nodeIds: " +
+ pipeline.getNodes().get(0).getUuid().toString() +
+ ", " + pipeline.getNodes().get(1).getUuid().toString() +
+ ", " + pipeline.getNodes().get(2).getUuid().toString());
+ }
return pipeline;
} catch (IOException ex) {
metrics.incNumPipelineCreationFailed();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index 8c348ed..1cf8d3a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -54,6 +54,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
private @Metric MutableCounterLong numPipelineDestroyFailed;
private @Metric MutableCounterLong numPipelineReportProcessed;
private @Metric MutableCounterLong numPipelineReportProcessingFailed;
+ private @Metric MutableCounterLong numPipelineContainSameDatanodes;
private Map<PipelineID, MutableCounterLong> numBlocksAllocated;
/** Private constructor. */
@@ -92,6 +93,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
numPipelineDestroyFailed.snapshot(recordBuilder, true);
numPipelineReportProcessed.snapshot(recordBuilder, true);
numPipelineReportProcessingFailed.snapshot(recordBuilder, true);
+ numPipelineContainSameDatanodes.snapshot(recordBuilder, true);
numBlocksAllocated
.forEach((pid, metric) -> metric.snapshot(recordBuilder, true));
}
@@ -176,4 +178,11 @@ public final class SCMPipelineMetrics implements MetricsSource {
void incNumPipelineReportProcessingFailed() {
numPipelineReportProcessingFailed.incr();
}
+
+ /**
+ * Increments number of pipeline who contains same set of datanodes.
+ */
+ void incNumPipelineContainSameDatanodes() {
+ numPipelineContainSameDatanodes.incr();
+ }
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index bca4189..cbeef7f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -17,7 +17,7 @@
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import org.apache.hadoop.hdds.scm.net.NetConstants;
@@ -93,7 +93,8 @@ public class MockNodeManager implements NodeManager {
private NetworkTopology clusterMap;
private ConcurrentMap<String, Set<String>> dnsToUuidMap;
- public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
+ public MockNodeManager(NetworkTopologyImpl clusterMap,
+ boolean initializeFakeNodes, int nodeCount) {
this.healthyNodes = new LinkedList<>();
this.staleNodes = new LinkedList<>();
this.deadNodes = new LinkedList<>();
@@ -101,8 +102,8 @@ public class MockNodeManager implements NodeManager {
this.node2PipelineMap = new Node2PipelineMap();
this.node2ContainerMap = new Node2ContainerMap();
this.dnsToUuidMap = new ConcurrentHashMap<>();
- aggregateStat = new SCMNodeStat();
- clusterMap = new NetworkTopologyImpl(new Configuration());
+ this.aggregateStat = new SCMNodeStat();
+ this.clusterMap = clusterMap;
if (initializeFakeNodes) {
for (int x = 0; x < nodeCount; x++) {
DatanodeDetails dd = MockDatanodeDetails.randomDatanodeDetails();
@@ -114,6 +115,11 @@ public class MockNodeManager implements NodeManager {
this.commandMap = new HashMap<>();
}
+ public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
+ this(new NetworkTopologyImpl(new OzoneConfiguration()),
+ initializeFakeNodes, nodeCount);
+ }
+
/**
* Invoked from ctor to create some node Metrics.
*
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index 612bf5d..f35bfe2 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
@@ -67,13 +68,14 @@ public class TestCloseContainerEventHandler {
.getTestDir(TestCloseContainerEventHandler.class.getSimpleName());
configuration
.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+ configuration.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT, 16);
nodeManager = new MockNodeManager(true, 10);
eventQueue = new EventQueue();
pipelineManager =
new SCMPipelineManager(configuration, nodeManager, eventQueue);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
- pipelineManager.getStateManager(), configuration);
+ pipelineManager.getStateManager(), configuration, eventQueue);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
containerManager = new SCMContainerManager(configuration, pipelineManager);
@@ -91,6 +93,9 @@ public class TestCloseContainerEventHandler {
if (containerManager != null) {
containerManager.close();
}
+ if (pipelineManager != null) {
+ pipelineManager.close();
+ }
FileUtil.fullyDelete(testDir);
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index ff52470..3eb146a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -73,6 +73,8 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
.setType(initialPipeline.getType())
.setFactor(factor)
.setNodes(initialPipeline.getNodes())
+ .setNodeIdsHash(RatisPipelineUtils
+ .encodeNodeIdsOfFactorThreePipeline(initialPipeline.getNodes()))
.build();
}
}
@@ -91,6 +93,8 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
.setType(HddsProtos.ReplicationType.RATIS)
.setFactor(factor)
.setNodes(nodes)
+ .setNodeIdsHash(RatisPipelineUtils
+ .encodeNodeIdsOfFactorThreePipeline(nodes))
.build();
}
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
new file mode 100644
index 0000000..45f85ef
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE;
+
+/**
+ * Test for pipeline datanodes intersection.
+ */
+@RunWith(Parameterized.class)
+public class TestPipelineDatanodesIntersection {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestPipelineDatanodesIntersection.class.getName());
+
+ private int nodeCount;
+ private int nodeHeaviness;
+ private OzoneConfiguration conf;
+ private boolean end;
+
+ @Before
+ public void initialize() {
+ conf = new OzoneConfiguration();
+ end = false;
+ }
+
+ public TestPipelineDatanodesIntersection(int nodeCount, int nodeHeaviness) {
+ this.nodeCount = nodeCount;
+ this.nodeHeaviness = nodeHeaviness;
+ }
+
+ @Parameterized.Parameters
+ public static Collection inputParams() {
+ return Arrays.asList(new Object[][] {
+ {4, 5},
+ {10, 5},
+ {20, 5},
+ {50, 5},
+ {100, 5},
+ {100, 10}
+ });
+ }
+
+ @Test
+ public void testPipelineDatanodesIntersection() {
+ NodeManager nodeManager= new MockNodeManager(true, nodeCount);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, nodeHeaviness);
+ conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false);
+ PipelineStateManager stateManager = new PipelineStateManager(conf);
+ PipelineProvider provider = new MockRatisPipelineProvider(nodeManager,
+ stateManager, conf);
+
+ int healthyNodeCount = nodeManager
+ .getNodeCount(HddsProtos.NodeState.HEALTHY);
+ int intersectionCount = 0;
+ int createdPipelineCount = 0;
+ while (!end && createdPipelineCount <= healthyNodeCount * nodeHeaviness) {
+ try {
+ Pipeline pipeline = provider.create(HddsProtos.ReplicationFactor.THREE);
+ stateManager.addPipeline(pipeline);
+ nodeManager.addPipeline(pipeline);
+ Pipeline overlapPipeline = RatisPipelineUtils
+ .checkPipelineContainSameDatanodes(stateManager, pipeline);
+ if (overlapPipeline != null){
+ intersectionCount++;
+ LOG.info("This pipeline: " + pipeline.getId().toString() +
+ " overlaps with previous pipeline: " + overlapPipeline.getId() +
+ ". They share same set of datanodes as: " +
+ pipeline.getNodesInOrder().get(0).getUuid() + "/" +
+ pipeline.getNodesInOrder().get(1).getUuid() + "/" +
+ pipeline.getNodesInOrder().get(2).getUuid() + " and " +
+ overlapPipeline.getNodesInOrder().get(0).getUuid() + "/" +
+ overlapPipeline.getNodesInOrder().get(1).getUuid() + "/" +
+ overlapPipeline.getNodesInOrder().get(2).getUuid() +
+ " is the same.");
+ }
+ createdPipelineCount++;
+ } catch(SCMException e) {
+ end = true;
+ } catch (IOException e) {
+ end = true;
+ // Should not throw regular IOException.
+ Assert.fail();
+ }
+ }
+
+ end = false;
+
+ LOG.info("Among total " +
+ stateManager.getPipelines(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE).size() + " created pipelines" +
+ " with " + healthyNodeCount + " healthy datanodes and " +
+ nodeHeaviness + " as node heaviness, " +
+ intersectionCount + " pipelines has same set of datanodes.");
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 66991e4..46fd8c8 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
@@ -34,9 +35,14 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
import static org.apache.commons.collections.CollectionUtils.intersection;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
/**
@@ -50,12 +56,13 @@ public class TestRatisPipelineProvider {
private NodeManager nodeManager;
private PipelineProvider provider;
private PipelineStateManager stateManager;
+ private OzoneConfiguration conf;
@Before
public void init() throws Exception {
nodeManager = new MockNodeManager(true, 10);
OzoneConfiguration conf = new OzoneConfiguration();
- conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
stateManager = new PipelineStateManager();
provider = new MockRatisPipelineProvider(nodeManager,
stateManager, conf);
@@ -75,8 +82,12 @@ public class TestRatisPipelineProvider {
// New pipeline should not overlap with the previous created pipeline
assertTrue(
intersection(pipeline.getNodes(), pipeline1.getNodes())
- .isEmpty());
+ .size() < factor.getNumber());
+ if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) {
+ assertNotEquals(pipeline.getNodeIdsHash(), pipeline1.getNodeIdsHash());
+ }
stateManager.addPipeline(pipeline1);
+ nodeManager.addPipeline(pipeline1);
}
@Test
@@ -92,10 +103,9 @@ public class TestRatisPipelineProvider {
assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE,
Pipeline.PipelineState.ALLOCATED);
stateManager.addPipeline(pipeline1);
- // New pipeline should overlap with the previous created pipeline,
- // and one datanode should overlap between the two types.
- assertEquals(1,
- intersection(pipeline.getNodes(), pipeline1.getNodes()).size());
+ // With enough pipeline quote on datanodes, they should not share
+ // the same set of datanodes.
+ assertNotEquals(pipeline.getNodeIdsHash(), pipeline1.getNodeIdsHash());
}
@Test
@@ -131,6 +141,49 @@ public class TestRatisPipelineProvider {
}
@Test
+ public void testComputeNodeIdsHash() {
+ int total = HddsProtos.ReplicationFactor.THREE.getNumber();
+ List<DatanodeDetails> nodes1 = new ArrayList<>();
+ for (int i = 0; i < total; i++) {
+ nodes1.add(MockDatanodeDetails.createDatanodeDetails(
+ UUID.fromString("00000-11000-00000-00000-0000" + (i + 1))));
+ }
+
+ Assert.assertEquals(total, nodes1.size());
+ Assert.assertNotEquals(0,
+ RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes1));
+
+ List<DatanodeDetails> nodes2 = new ArrayList<>();
+ for (int i = 0; i < total; i++) {
+ nodes2.add(MockDatanodeDetails.createDatanodeDetails(
+ UUID.fromString("00000-11000-00000-00000-0000" + (total - i))));
+ }
+ Assert.assertEquals(total, nodes2.size());
+ Assert.assertNotEquals(0,
+ RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes2));
+
+ Assert.assertEquals(
+ RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes1),
+ RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes2));
+ }
+
+ @Test
+ public void testCreateFactorTHREEPipelineWithSameDatanodes() {
+ List<DatanodeDetails> healthyNodes = nodeManager
+ .getNodes(HddsProtos.NodeState.HEALTHY).stream()
+ .limit(3).collect(Collectors.toList());
+
+ Pipeline pipeline1 = provider.create(
+ HddsProtos.ReplicationFactor.THREE, healthyNodes);
+ Pipeline pipeline2 = provider.create(
+ HddsProtos.ReplicationFactor.THREE, healthyNodes);
+
+ Assert.assertTrue(pipeline1.getNodes().parallelStream()
+ .allMatch(pipeline2.getNodes()::contains));
+ Assert.assertEquals(pipeline1.getNodeIdsHash(), pipeline2.getNodeIdsHash());
+ }
+
+ @Test
public void testCreatePipelinesDnExclude() throws IOException {
List<DatanodeDetails> healthyNodes =
nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
@@ -141,7 +194,11 @@ public class TestRatisPipelineProvider {
// Use up first 3 DNs for an open pipeline.
List<DatanodeDetails> dns = healthyNodes.subList(0, 3);
- addPipeline(dns, factor, Pipeline.PipelineState.OPEN, REPLICATION_TYPE);
+ for (int i = 0; i < conf.getInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+ OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT); i++) {
+ // Saturate pipeline counts on all the 1st 3 DNs.
+ addPipeline(dns, factor, Pipeline.PipelineState.OPEN, REPLICATION_TYPE);
+ }
Set<DatanodeDetails> membersOfOpenPipelines = new HashSet<>(dns);
// Use up next 3 DNs for a closed pipeline.
@@ -160,7 +217,7 @@ public class TestRatisPipelineProvider {
List<DatanodeDetails> nodes = pipeline.getNodes();
assertTrue(
- "nodes of new pipeline cannot be from open pipelines",
+ "nodes of new pipeline cannot be all from open pipelines",
nodes.stream().noneMatch(membersOfOpenPipelines::contains));
assertTrue(
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 491e289..e6bf7a0 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
+import static org.apache.commons.collections.CollectionUtils.intersection;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
@@ -30,6 +31,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
@@ -89,8 +91,10 @@ public class TestSCMPipelineManager {
pipelineManager.getStateManager(), conf);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
+ int pipelineNum = 5;
+
Set<Pipeline> pipelines = new HashSet<>();
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < pipelineNum; i++) {
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
@@ -112,6 +116,15 @@ public class TestSCMPipelineManager {
List<Pipeline> pipelineList =
pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipelines, new HashSet<>(pipelineList));
+ // All NodeIdsHash from original pipeline list
+ List<Integer> originalPipelineHash = pipelineList.stream()
+ .map(Pipeline::getNodeIdsHash).collect(Collectors.toList());
+ // All NodeIdsHash from reloaded pipeline list
+ List<Integer> reloadedPipelineHash = pipelines.stream()
+ .map(Pipeline::getNodeIdsHash).collect(Collectors.toList());
+ // Original NodeIdsHash list should contain same items from reloaded one.
+ Assert.assertEquals(pipelineNum,
+ intersection(originalPipelineHash, reloadedPipelineHash).size());
// clean up
for (Pipeline pipeline : pipelines) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 7e8ec52..0698443 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -168,11 +168,11 @@ public class ReplicationNodeManagerMock implements NodeManager {
/**
* Get the count of pipelines a datanodes is associated with.
- * @param dnId DatanodeDetails
+ * @param dn DatanodeDetails
* @return The number of pipelines
*/
@Override
- public int getPipelinesCount(DatanodeDetails dnId) {
+ public int getPipelinesCount(DatanodeDetails dn) {
throw new UnsupportedOperationException("Not yet implemented");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org