You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2020/02/10 02:23:29 UTC

[hadoop-ozone] 11/18: HDDS-1574 Average out pipeline allocation on datanodes and add metrcs/test (#291)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit d6b9ec0a5667322f2eb2c820953684c9f6ab1cc1
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Thu Dec 19 06:00:35 2019 +0800

    HDDS-1574 Average out pipeline allocation on datanodes and add metrcs/test (#291)
---
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |  19 +++
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |   3 +-
 .../hdds/scm/pipeline/PipelineStateManager.java    |   7 ++
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |   8 ++
 .../hdds/scm/pipeline/RatisPipelineUtils.java      |  38 +++++-
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  12 ++
 .../hdds/scm/pipeline/SCMPipelineMetrics.java      |   9 ++
 .../hadoop/hdds/scm/container/MockNodeManager.java |  14 ++-
 .../container/TestCloseContainerEventHandler.java  |   7 +-
 .../scm/pipeline/MockRatisPipelineProvider.java    |   4 +
 .../TestPipelineDatanodesIntersection.java         | 129 +++++++++++++++++++++
 .../scm/pipeline/TestRatisPipelineProvider.java    |  73 ++++++++++--
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |  15 ++-
 .../testutils/ReplicationNodeManagerMock.java      |   4 +-
 14 files changed, 324 insertions(+), 18 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 295156d..1dc2373 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -58,6 +58,8 @@ public final class Pipeline {
   private UUID leaderId;
   // Timestamp for pipeline upon creation
   private Long creationTimestamp;
+  // Only valid for Ratis THREE pipeline. No need persist.
+  private int nodeIdsHash;
 
   /**
    * The immutable properties of pipeline object is used in
@@ -73,6 +75,7 @@ public final class Pipeline {
     this.state = state;
     this.nodeStatus = nodeStatus;
     this.creationTimestamp = System.currentTimeMillis();
+    this.nodeIdsHash = 0;
   }
 
   /**
@@ -129,6 +132,14 @@ public final class Pipeline {
     this.creationTimestamp = creationTimestamp;
   }
 
+  public int getNodeIdsHash() {
+    return nodeIdsHash;
+  }
+
+  void setNodeIdsHash(int nodeIdsHash) {
+    this.nodeIdsHash = nodeIdsHash;
+  }
+
   /**
    * Return the pipeline leader's UUID.
    *
@@ -347,6 +358,7 @@ public final class Pipeline {
     private List<DatanodeDetails> nodesInOrder = null;
     private UUID leaderId = null;
     private Long creationTimestamp = null;
+    private int nodeIdsHash = 0;
 
     public Builder() {}
 
@@ -359,6 +371,7 @@ public final class Pipeline {
       this.nodesInOrder = pipeline.nodesInOrder.get();
       this.leaderId = pipeline.getLeaderId();
       this.creationTimestamp = pipeline.getCreationTimestamp();
+      this.nodeIdsHash = 0;
     }
 
     public Builder setId(PipelineID id1) {
@@ -397,6 +410,11 @@ public final class Pipeline {
       return this;
     }
 
+    public Builder setNodeIdsHash(int nodeIdsHash1) {
+      this.nodeIdsHash = nodeIdsHash1;
+      return this;
+    }
+
     public Pipeline build() {
       Preconditions.checkNotNull(id);
       Preconditions.checkNotNull(type);
@@ -405,6 +423,7 @@ public final class Pipeline {
       Preconditions.checkNotNull(nodeStatus);
       Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus);
       pipeline.setLeaderId(leaderId);
+      pipeline.setNodeIdsHash(nodeIdsHash);
       // overwrite with original creationTimestamp
       if (creationTimestamp != null) {
         pipeline.setCreationTimestamp(creationTimestamp);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 23eb574..bc65d14 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -162,7 +162,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     // filter nodes that meet the size and pipeline engagement criteria.
     // Pipeline placement doesn't take node space left into account.
     List<DatanodeDetails> healthyList = healthyNodes.stream()
-        .filter(d -> meetCriteria(d, nodesRequired)).limit(nodesRequired)
+        .filter(d -> meetCriteria(d, nodesRequired))
         .collect(Collectors.toList());
 
     if (healthyList.size() < nodesRequired) {
@@ -308,6 +308,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
     }
     // the pick is decided and it should be removed from candidates.
     healthyNodes.remove(datanodeDetails);
+
     return datanodeDetails;
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index bb56a03..051202b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -132,6 +132,13 @@ public class PipelineStateManager {
       pipeline = pipelineStateMap
           .updatePipelineState(pipelineId, PipelineState.OPEN);
     }
+    // Amend nodeIdsHash if needed.
+    if (pipeline.getType() == ReplicationType.RATIS &&
+        pipeline.getFactor() == ReplicationFactor.THREE &&
+        pipeline.getNodeIdsHash() == 0) {
+      pipeline.setNodeIdsHash(RatisPipelineUtils
+          .encodeNodeIdsOfFactorThreePipeline(pipeline.getNodes()));
+    }
     return pipeline;
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 23b02ed..9585907 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -157,6 +157,7 @@ public class RatisPipelineProvider implements PipelineProvider {
     }
 
     List<DatanodeDetails> dns;
+    int nodeIdHash = 0;
 
     switch(factor) {
     case ONE:
@@ -165,6 +166,7 @@ public class RatisPipelineProvider implements PipelineProvider {
     case THREE:
       dns = placementPolicy.chooseDatanodes(null,
           null, factor.getNumber(), 0);
+      nodeIdHash = RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(dns);
       break;
     default:
       throw new IllegalStateException("Unknown factor: " + factor.name());
@@ -176,6 +178,7 @@ public class RatisPipelineProvider implements PipelineProvider {
         .setType(ReplicationType.RATIS)
         .setFactor(factor)
         .setNodes(dns)
+        .setNodeIdsHash(nodeIdHash)
         .build();
 
     // Send command to datanodes to create pipeline
@@ -196,12 +199,17 @@ public class RatisPipelineProvider implements PipelineProvider {
   @Override
   public Pipeline create(ReplicationFactor factor,
                          List<DatanodeDetails> nodes) {
+    int nodeIdHash = 0;
+    if (factor == ReplicationFactor.THREE) {
+      nodeIdHash = RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes);
+    }
     return Pipeline.newBuilder()
         .setId(PipelineID.randomId())
         .setState(PipelineState.ALLOCATED)
         .setType(ReplicationType.RATIS)
         .setFactor(factor)
         .setNodes(nodes)
+        .setNodeIdsHash(nodeIdHash)
         .build();
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index b8cdf06..f9f2011 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -18,9 +18,12 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
 import org.apache.ratis.client.RaftClient;
@@ -33,7 +36,6 @@ import org.apache.ratis.rpc.SupportedRpcType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * Utility class for Ratis pipelines. Contains methods to create and destroy
  * ratis pipelines.
@@ -93,4 +95,38 @@ public final class RatisPipelineUtils {
           true, p.getId());
     }
   }
+
+  static int encodeNodeIdsOfFactorThreePipeline(List<DatanodeDetails> nodes) {
+    if (nodes.size() != HddsProtos.ReplicationFactor.THREE.getNumber()) {
+      return 0;
+    }
+    return nodes.get(0).getUuid().hashCode() ^
+        nodes.get(1).getUuid().hashCode() ^
+        nodes.get(2).getUuid().hashCode();
+  }
+
+  /**
+   * Return first existed pipeline which share the same set of datanodes
+   * with the input pipeline.
+   * @param stateManager PipelineStateManager
+   * @param pipeline input pipeline
+   * @return first matched pipeline
+   */
+  static Pipeline checkPipelineContainSameDatanodes(
+      PipelineStateManager stateManager, Pipeline pipeline) {
+    List<Pipeline> matchedPipelines = stateManager.getPipelines(
+        HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.THREE)
+        .stream().filter(p -> !p.getId().equals(pipeline.getId()) &&
+            (// For all OPEN or ALLOCATED pipelines
+                p.getPipelineState() == Pipeline.PipelineState.OPEN ||
+                p.getPipelineState() == Pipeline.PipelineState.ALLOCATED) &&
+                p.getNodeIdsHash() == pipeline.getNodeIdsHash())
+        .collect(Collectors.toList());
+    if (matchedPipelines.size() == 0) {
+      return null;
+    } else {
+      return matchedPipelines.stream().findFirst().get();
+    }
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 01af465..11e9916 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -172,6 +172,18 @@ public class SCMPipelineManager implements PipelineManager {
         metrics.incNumPipelineCreated();
         metrics.createPerPipelineMetrics(pipeline);
       }
+      Pipeline overlapPipeline = RatisPipelineUtils
+          .checkPipelineContainSameDatanodes(stateManager, pipeline);
+      if (overlapPipeline != null) {
+        metrics.incNumPipelineContainSameDatanodes();
+        //TODO remove until pipeline allocation is proved equally distributed.
+        LOG.info("Pipeline: " + pipeline.getId().toString() +
+            " contains same datanodes as previous pipeline: " +
+            overlapPipeline.getId().toString() + " nodeIds: " +
+            pipeline.getNodes().get(0).getUuid().toString() +
+            ", " + pipeline.getNodes().get(1).getUuid().toString() +
+            ", " + pipeline.getNodes().get(2).getUuid().toString());
+      }
       return pipeline;
     } catch (IOException ex) {
       metrics.incNumPipelineCreationFailed();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index 8c348ed..1cf8d3a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -54,6 +54,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
   private @Metric MutableCounterLong numPipelineDestroyFailed;
   private @Metric MutableCounterLong numPipelineReportProcessed;
   private @Metric MutableCounterLong numPipelineReportProcessingFailed;
+  private @Metric MutableCounterLong numPipelineContainSameDatanodes;
   private Map<PipelineID, MutableCounterLong> numBlocksAllocated;
 
   /** Private constructor. */
@@ -92,6 +93,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
     numPipelineDestroyFailed.snapshot(recordBuilder, true);
     numPipelineReportProcessed.snapshot(recordBuilder, true);
     numPipelineReportProcessingFailed.snapshot(recordBuilder, true);
+    numPipelineContainSameDatanodes.snapshot(recordBuilder, true);
     numBlocksAllocated
         .forEach((pid, metric) -> metric.snapshot(recordBuilder, true));
   }
@@ -176,4 +178,11 @@ public final class SCMPipelineMetrics implements MetricsSource {
   void incNumPipelineReportProcessingFailed() {
     numPipelineReportProcessingFailed.incr();
   }
+
+  /**
+   * Increments number of pipeline who contains same set of datanodes.
+   */
+  void incNumPipelineContainSameDatanodes() {
+    numPipelineContainSameDatanodes.incr();
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index bca4189..cbeef7f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -17,7 +17,7 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.net.NetConstants;
@@ -93,7 +93,8 @@ public class MockNodeManager implements NodeManager {
   private NetworkTopology clusterMap;
   private ConcurrentMap<String, Set<String>> dnsToUuidMap;
 
-  public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
+  public MockNodeManager(NetworkTopologyImpl clusterMap,
+                         boolean initializeFakeNodes, int nodeCount) {
     this.healthyNodes = new LinkedList<>();
     this.staleNodes = new LinkedList<>();
     this.deadNodes = new LinkedList<>();
@@ -101,8 +102,8 @@ public class MockNodeManager implements NodeManager {
     this.node2PipelineMap = new Node2PipelineMap();
     this.node2ContainerMap = new Node2ContainerMap();
     this.dnsToUuidMap = new ConcurrentHashMap<>();
-    aggregateStat = new SCMNodeStat();
-    clusterMap = new NetworkTopologyImpl(new Configuration());
+    this.aggregateStat = new SCMNodeStat();
+    this.clusterMap = clusterMap;
     if (initializeFakeNodes) {
       for (int x = 0; x < nodeCount; x++) {
         DatanodeDetails dd = MockDatanodeDetails.randomDatanodeDetails();
@@ -114,6 +115,11 @@ public class MockNodeManager implements NodeManager {
     this.commandMap = new HashMap<>();
   }
 
+  public MockNodeManager(boolean initializeFakeNodes, int nodeCount) {
+    this(new NetworkTopologyImpl(new OzoneConfiguration()),
+        initializeFakeNodes, nodeCount);
+  }
+
   /**
    * Invoked from ctor to create some node Metrics.
    *
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index 612bf5d..f35bfe2 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
@@ -67,13 +68,14 @@ public class TestCloseContainerEventHandler {
         .getTestDir(TestCloseContainerEventHandler.class.getSimpleName());
     configuration
         .set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    configuration.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT, 16);
     nodeManager = new MockNodeManager(true, 10);
     eventQueue = new EventQueue();
     pipelineManager =
         new SCMPipelineManager(configuration, nodeManager, eventQueue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
-            pipelineManager.getStateManager(), configuration);
+            pipelineManager.getStateManager(), configuration, eventQueue);
     pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
     containerManager = new SCMContainerManager(configuration, pipelineManager);
@@ -91,6 +93,9 @@ public class TestCloseContainerEventHandler {
     if (containerManager != null) {
       containerManager.close();
     }
+    if (pipelineManager != null) {
+      pipelineManager.close();
+    }
     FileUtil.fullyDelete(testDir);
   }
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index ff52470..3eb146a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -73,6 +73,8 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
           .setType(initialPipeline.getType())
           .setFactor(factor)
           .setNodes(initialPipeline.getNodes())
+          .setNodeIdsHash(RatisPipelineUtils
+              .encodeNodeIdsOfFactorThreePipeline(initialPipeline.getNodes()))
           .build();
     }
   }
@@ -91,6 +93,8 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
         .setType(HddsProtos.ReplicationType.RATIS)
         .setFactor(factor)
         .setNodes(nodes)
+        .setNodeIdsHash(RatisPipelineUtils
+            .encodeNodeIdsOfFactorThreePipeline(nodes))
         .build();
   }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
new file mode 100644
index 0000000..45f85ef
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE;
+
+/**
+ * Test for pipeline datanodes intersection.
+ */
+@RunWith(Parameterized.class)
+public class TestPipelineDatanodesIntersection {
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestPipelineDatanodesIntersection.class.getName());
+
+  private int nodeCount;
+  private int nodeHeaviness;
+  private OzoneConfiguration conf;
+  private boolean end;
+
+  @Before
+  public void initialize() {
+    conf = new OzoneConfiguration();
+    end = false;
+  }
+
+  public TestPipelineDatanodesIntersection(int nodeCount, int nodeHeaviness) {
+    this.nodeCount = nodeCount;
+    this.nodeHeaviness = nodeHeaviness;
+  }
+
+  @Parameterized.Parameters
+  public static Collection inputParams() {
+    return Arrays.asList(new Object[][] {
+        {4, 5},
+        {10, 5},
+        {20, 5},
+        {50, 5},
+        {100, 5},
+        {100, 10}
+    });
+  }
+
+  @Test
+  public void testPipelineDatanodesIntersection() {
+    NodeManager nodeManager= new MockNodeManager(true, nodeCount);
+    conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, nodeHeaviness);
+    conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false);
+    PipelineStateManager stateManager = new PipelineStateManager(conf);
+    PipelineProvider provider = new MockRatisPipelineProvider(nodeManager,
+        stateManager, conf);
+
+    int healthyNodeCount = nodeManager
+        .getNodeCount(HddsProtos.NodeState.HEALTHY);
+    int intersectionCount = 0;
+    int createdPipelineCount = 0;
+    while (!end && createdPipelineCount <= healthyNodeCount * nodeHeaviness) {
+      try {
+        Pipeline pipeline = provider.create(HddsProtos.ReplicationFactor.THREE);
+        stateManager.addPipeline(pipeline);
+        nodeManager.addPipeline(pipeline);
+        Pipeline overlapPipeline = RatisPipelineUtils
+            .checkPipelineContainSameDatanodes(stateManager, pipeline);
+        if (overlapPipeline != null){
+          intersectionCount++;
+          LOG.info("This pipeline: " + pipeline.getId().toString() +
+              " overlaps with previous pipeline: " + overlapPipeline.getId() +
+              ". They share same set of datanodes as: " +
+              pipeline.getNodesInOrder().get(0).getUuid() + "/" +
+              pipeline.getNodesInOrder().get(1).getUuid() + "/" +
+              pipeline.getNodesInOrder().get(2).getUuid() + " and " +
+              overlapPipeline.getNodesInOrder().get(0).getUuid() + "/" +
+              overlapPipeline.getNodesInOrder().get(1).getUuid() + "/" +
+              overlapPipeline.getNodesInOrder().get(2).getUuid() +
+              " is the same.");
+        }
+        createdPipelineCount++;
+      } catch(SCMException e) {
+        end = true;
+      } catch (IOException e) {
+        end = true;
+        // Should not throw regular IOException.
+        Assert.fail();
+      }
+    }
+
+    end = false;
+
+    LOG.info("Among total " +
+        stateManager.getPipelines(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE).size() + " created pipelines" +
+        " with " + healthyNodeCount + " healthy datanodes and " +
+        nodeHeaviness + " as node heaviness, " +
+        intersectionCount + " pipelines has same set of datanodes.");
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 66991e4..46fd8c8 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
@@ -34,9 +35,14 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
 
 import static org.apache.commons.collections.CollectionUtils.intersection;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -50,12 +56,13 @@ public class TestRatisPipelineProvider {
   private NodeManager nodeManager;
   private PipelineProvider provider;
   private PipelineStateManager stateManager;
+  private OzoneConfiguration conf;
 
   @Before
   public void init() throws Exception {
     nodeManager = new MockNodeManager(true, 10);
     OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
+    conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
     stateManager = new PipelineStateManager();
     provider = new MockRatisPipelineProvider(nodeManager,
         stateManager, conf);
@@ -75,8 +82,12 @@ public class TestRatisPipelineProvider {
     // New pipeline should not overlap with the previous created pipeline
     assertTrue(
         intersection(pipeline.getNodes(), pipeline1.getNodes())
-            .isEmpty());
+            .size() < factor.getNumber());
+    if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) {
+      assertNotEquals(pipeline.getNodeIdsHash(), pipeline1.getNodeIdsHash());
+    }
     stateManager.addPipeline(pipeline1);
+    nodeManager.addPipeline(pipeline1);
   }
 
   @Test
@@ -92,10 +103,9 @@ public class TestRatisPipelineProvider {
     assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE,
         Pipeline.PipelineState.ALLOCATED);
     stateManager.addPipeline(pipeline1);
-    // New pipeline should overlap with the previous created pipeline,
-    // and one datanode should overlap between the two types.
-    assertEquals(1,
-        intersection(pipeline.getNodes(), pipeline1.getNodes()).size());
+    // With enough pipeline quote on datanodes, they should not share
+    // the same set of datanodes.
+    assertNotEquals(pipeline.getNodeIdsHash(), pipeline1.getNodeIdsHash());
   }
 
   @Test
@@ -131,6 +141,49 @@ public class TestRatisPipelineProvider {
   }
 
   @Test
+  public void testComputeNodeIdsHash() {
+    int total = HddsProtos.ReplicationFactor.THREE.getNumber();
+    List<DatanodeDetails> nodes1 = new ArrayList<>();
+    for (int i = 0; i < total; i++) {
+      nodes1.add(MockDatanodeDetails.createDatanodeDetails(
+          UUID.fromString("00000-11000-00000-00000-0000" + (i + 1))));
+    }
+
+    Assert.assertEquals(total, nodes1.size());
+    Assert.assertNotEquals(0,
+        RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes1));
+
+    List<DatanodeDetails> nodes2 = new ArrayList<>();
+    for (int i = 0; i < total; i++) {
+      nodes2.add(MockDatanodeDetails.createDatanodeDetails(
+          UUID.fromString("00000-11000-00000-00000-0000" + (total - i))));
+    }
+    Assert.assertEquals(total, nodes2.size());
+    Assert.assertNotEquals(0,
+        RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes2));
+
+    Assert.assertEquals(
+        RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes1),
+        RatisPipelineUtils.encodeNodeIdsOfFactorThreePipeline(nodes2));
+  }
+
+  @Test
+  public void testCreateFactorTHREEPipelineWithSameDatanodes() {
+    List<DatanodeDetails> healthyNodes = nodeManager
+        .getNodes(HddsProtos.NodeState.HEALTHY).stream()
+        .limit(3).collect(Collectors.toList());
+
+    Pipeline pipeline1 = provider.create(
+        HddsProtos.ReplicationFactor.THREE, healthyNodes);
+    Pipeline pipeline2 = provider.create(
+        HddsProtos.ReplicationFactor.THREE, healthyNodes);
+
+    Assert.assertTrue(pipeline1.getNodes().parallelStream()
+        .allMatch(pipeline2.getNodes()::contains));
+    Assert.assertEquals(pipeline1.getNodeIdsHash(), pipeline2.getNodeIdsHash());
+  }
+
+  @Test
   public void testCreatePipelinesDnExclude() throws IOException {
     List<DatanodeDetails> healthyNodes =
         nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
@@ -141,7 +194,11 @@ public class TestRatisPipelineProvider {
 
     // Use up first 3 DNs for an open pipeline.
     List<DatanodeDetails> dns = healthyNodes.subList(0, 3);
-    addPipeline(dns, factor, Pipeline.PipelineState.OPEN, REPLICATION_TYPE);
+    for (int i = 0; i < conf.getInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+        OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT); i++) {
+      // Saturate pipeline counts on all the 1st 3 DNs.
+      addPipeline(dns, factor, Pipeline.PipelineState.OPEN, REPLICATION_TYPE);
+    }
     Set<DatanodeDetails> membersOfOpenPipelines = new HashSet<>(dns);
 
     // Use up next 3 DNs for a closed pipeline.
@@ -160,7 +217,7 @@ public class TestRatisPipelineProvider {
     List<DatanodeDetails> nodes = pipeline.getNodes();
 
     assertTrue(
-        "nodes of new pipeline cannot be from open pipelines",
+        "nodes of new pipeline cannot be all from open pipelines",
         nodes.stream().noneMatch(membersOfOpenPipelines::contains));
 
     assertTrue(
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 491e289..e6bf7a0 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
+import static org.apache.commons.collections.CollectionUtils.intersection;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
@@ -30,6 +31,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
@@ -89,8 +91,10 @@ public class TestSCMPipelineManager {
             pipelineManager.getStateManager(), conf);
     pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
+    int pipelineNum = 5;
+
     Set<Pipeline> pipelines = new HashSet<>();
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < pipelineNum; i++) {
       Pipeline pipeline = pipelineManager
           .createPipeline(HddsProtos.ReplicationType.RATIS,
               HddsProtos.ReplicationFactor.THREE);
@@ -112,6 +116,15 @@ public class TestSCMPipelineManager {
     List<Pipeline> pipelineList =
         pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipelines, new HashSet<>(pipelineList));
+    // All NodeIdsHash from original pipeline list
+    List<Integer> originalPipelineHash = pipelineList.stream()
+        .map(Pipeline::getNodeIdsHash).collect(Collectors.toList());
+    // All NodeIdsHash from reloaded pipeline list
+    List<Integer> reloadedPipelineHash = pipelines.stream()
+        .map(Pipeline::getNodeIdsHash).collect(Collectors.toList());
+    // Original NodeIdsHash list should contain same items from reloaded one.
+    Assert.assertEquals(pipelineNum,
+        intersection(originalPipelineHash, reloadedPipelineHash).size());
 
     // clean up
     for (Pipeline pipeline : pipelines) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 7e8ec52..0698443 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -168,11 +168,11 @@ public class ReplicationNodeManagerMock implements NodeManager {
 
   /**
    * Get the count of pipelines a datanodes is associated with.
-   * @param dnId DatanodeDetails
+   * @param dn DatanodeDetails
    * @return The number of pipelines
    */
   @Override
-  public int getPipelinesCount(DatanodeDetails dnId) {
+  public int getPipelinesCount(DatanodeDetails dn) {
     throw new UnsupportedOperationException("Not yet implemented");
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org