You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ozone.apache.org by "whbing (via GitHub)" <gi...@apache.org> on 2023/09/23 18:59:10 UTC

[GitHub] [ozone] whbing opened a new pull request, #5354: HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space

whbing opened a new pull request, #5354:
URL: https://github.com/apache/ozone/pull/5354

   ## What changes were proposed in this pull request?
   
   Add pipeline choose policy impl CapacityPipelineChoosePolicy.
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/HDDS-9345
   
   ## How was this patch tested?
   
   1.unit test
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "adoroszlai (via GitHub)" <gi...@apache.org>.
adoroszlai commented on code in PR #5354:
URL: https://github.com/apache/ozone/pull/5354#discussion_r1457831784


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/PipelineChoosePolicy.java:
##########
@@ -26,6 +27,15 @@
  */
 public interface PipelineChoosePolicy {
 
+  /**
+   * Updates the policy with NodeManager.
+   * @return updated policy.
+   */
+  default PipelineChoosePolicy init(final NodeManager nodeManager) {
+    // override if the policy requires nodeManager
+    return this;
+  }

Review Comment:
   Nice!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "whbing (via GitHub)" <gi...@apache.org>.
whbing commented on code in PR #5354:
URL: https://github.com/apache/ozone/pull/5354#discussion_r1406134826


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The capacity pipeline choose policy that chooses pipeline
+ * with relatively more remaining datanode space.
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {

Review Comment:
   > This is also random pipeline choice, can rename this as RandomCapacityPipelineChoosePolicy. Also in comments, can give brief description, choose 2 random pipeline and return pipeline with lower usages.
   
   @sumitagrawl Thanks! Will update next commit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "whbing (via GitHub)" <gi...@apache.org>.
whbing commented on PR #5354:
URL: https://github.com/apache/ozone/pull/5354#issuecomment-1840883564

   > are we planning to make policy as default? or its just an option provided. We may need have in defined docs.
   
   @sumitagrawl  an option provided and NOT change the default value. Add description in `ScmConfig.java`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "adoroszlai (via GitHub)" <gi...@apache.org>.
adoroszlai commented on PR #5354:
URL: https://github.com/apache/ozone/pull/5354#issuecomment-1904447621

   Thanks @whbing for the patch, @siddhantsangwan, @sodonnel, @sumitagrawl, @xichen01 for the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "adoroszlai (via GitHub)" <gi...@apache.org>.
adoroszlai commented on PR #5354:
URL: https://github.com/apache/ozone/pull/5354#issuecomment-1898999362

   @siddhantsangwan @sodonnel please take another look


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "whbing (via GitHub)" <gi...@apache.org>.
whbing commented on code in PR #5354:
URL: https://github.com/apache/ozone/pull/5354#discussion_r1406133546


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The capacity pipeline choose policy that chooses pipeline
+ * with relatively more remaining datanode space.
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineChoosePolicy.class);
+
+  private final NodeManager nodeManager;
+
+  private final PipelineChoosePolicy healthPolicy;
+
+  public CapacityPipelineChoosePolicy(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
+    healthPolicy = new HealthyPipelineChoosePolicy(nodeManager);
+  }
+
+  @Override
+  public Pipeline choosePipeline(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri);
+    Pipeline pipeline2 = healthPolicy.choosePipeline(pipelineList, pri);
+
+    int result = new CapacityPipelineComparator(this)
+        .compare(pipeline1, pipeline2);
+
+    LOG.debug("Chosen the {} pipeline", result <= 0 ? "first" : "second");
+    return result <= 0 ? pipeline1 : pipeline2;
+  }
+
+  @Override
+  public int choosePipelineIndex(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    List<Pipeline> mutableList = new ArrayList<>(pipelineList);
+    Pipeline pipeline = choosePipeline(mutableList, pri);
+    return pipelineList.indexOf(pipeline);
+  }
+
+  /**
+   * Return a list of SCMNodeMetrics corresponding to the DataNodes in the
+   * pipeline, sorted in descending order based on scm used storage.
+   * @param pipeline pipeline
+   * @return sorted SCMNodeMetrics corresponding the pipeline
+   */
+  private Deque<SCMNodeMetric> getSortedNodeFromPipeline(Pipeline pipeline) {
+    Deque<SCMNodeMetric> sortedNodeStack = new ArrayDeque<>();
+    pipeline.getNodes().stream()
+        .map(nodeManager::getNodeStat)
+        .filter(Objects::nonNull)
+        .sorted()
+        .forEach(sortedNodeStack::push);
+    return sortedNodeStack;
+  }
+
+  static class CapacityPipelineComparator implements Comparator<Pipeline> {
+    private final CapacityPipelineChoosePolicy policy;
+
+    CapacityPipelineComparator(CapacityPipelineChoosePolicy policy) {
+      this.policy = policy;
+    }
+    @Override
+    public int compare(Pipeline p1, Pipeline p2) {
+      if (p1.getId().equals(p2.getId())) {
+        LOG.debug("Compare the same pipeline {}", p1);
+        return 0;
+      }
+      Deque<SCMNodeMetric> sortedNodes1 = policy.getSortedNodeFromPipeline(p1);
+      Deque<SCMNodeMetric> sortedNodes2 = policy.getSortedNodeFromPipeline(p2);
+
+      if (sortedNodes1.isEmpty() || sortedNodes2.isEmpty()) {
+        LOG.warn("Cannot obtain SCMNodeMetric in pipeline {} or {}", p1, p2);
+        return 0;
+      }
+      LOG.debug("Compare scmUsed weight in pipelines, first : {}, second : {}",
+          sortedNodes1, sortedNodes2);
+      // Compare the scmUsed of the first node in the two sorted node stacks
+      int result = sortedNodes1.pop().compareTo(sortedNodes2.pop());
+
+      if (result == 0 && !sortedNodes1.isEmpty() && !sortedNodes2.isEmpty()) {

Review Comment:
   > Only 2 nodes are compared, but not all the nodes in pipeline, any reason?
   
   It's accurate that comparing all nodes would yield more precision. My initial thought was that two rounds of comparison should cover most cases. I will try to modify a version and add some tests.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "whbing (via GitHub)" <gi...@apache.org>.
whbing commented on code in PR #5354:
URL: https://github.com/apache/ozone/pull/5354#discussion_r1399992438


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The capacity pipeline choose policy that chooses pipeline
+ * with relatively more remaining datanode space.
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineChoosePolicy.class);
+
+  private final NodeManager nodeManager;
+
+  private PipelineChoosePolicy healthPolicy;
+
+  public CapacityPipelineChoosePolicy(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
+    healthPolicy = new HealthyPipelineChoosePolicy(nodeManager);
+  }
+
+  @Override
+  public Pipeline choosePipeline(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    Pipeline targetPipeline;
+    Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri);
+    Pipeline pipeline2 = healthPolicy.choosePipeline(pipelineList, pri);
+    if (pipeline1.getId().equals(pipeline2.getId())) {
+      targetPipeline = pipeline1;
+      LOG.debug("Chosen pipeline = {}", targetPipeline);
+    } else {
+      SCMNodeMetric metric1 = getMaxUsageNodeFromPipeline(pipeline1);
+      SCMNodeMetric metric2 = getMaxUsageNodeFromPipeline(pipeline2);
+      if (metric1 == null || metric2 == null) {
+        LOG.warn("Can't get SCMNodeStat from pipeline: {} or {}.",
+            pipeline1, pipeline2);
+        targetPipeline = pipeline1;
+      } else {
+        targetPipeline =
+            !metric1.isGreater(metric2.get()) ? pipeline1 : pipeline2;

Review Comment:
   > It's possible that we're checking two pipelines which share a datanode (multi raft), and that datanode is the most used one in both the pipelines. This will result in a tie and we'll choose the first pipeline. I'm wondering if it's better to break the tie by comparing the second most used node in that case.
   
   @siddhantsangwan Thanks for review !  It's a good idea to consider a second node. I'll update the code later. 
   ( Also, I'm thinking there shouldn't be a need to consider a third node, as that might make the logic quite redundant. )



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "whbing (via GitHub)" <gi...@apache.org>.
whbing commented on code in PR #5354:
URL: https://github.com/apache/ozone/pull/5354#discussion_r1457657821


##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestCapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for the capacity pipeline choose policy.
+ */
+public class TestCapacityPipelineChoosePolicy {
+  @Test
+  public void choosePipeline() throws Exception {
+
+    // given 4 datanode
+    List<DatanodeDetails> datanodes = new ArrayList<>();
+    for (int i = 0; i < 4; i++) {
+      datanodes.add(MockDatanodeDetails.randomDatanodeDetails());
+    }
+    //          dn0   dn1   dn2   dn3
+    // used       0   10    20    30
+    NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
+    when(mockNodeManager.getNodeStat(datanodes.get(0)))
+        .thenReturn(new SCMNodeMetric(100L, 0L, 100L));
+    when(mockNodeManager.getNodeStat(datanodes.get(1)))
+        .thenReturn(new SCMNodeMetric(100L, 10L, 90L));
+    when(mockNodeManager.getNodeStat(datanodes.get(2)))
+        .thenReturn(new SCMNodeMetric(100L, 20L, 80L));
+    when(mockNodeManager.getNodeStat(datanodes.get(3)))
+        .thenReturn(new SCMNodeMetric(100L, 30L, 70L));
+
+    CapacityPipelineChoosePolicy policy =
+        new CapacityPipelineChoosePolicy(mockNodeManager);
+
+    // generate 4 pipelines, and every pipeline has 3 datanodes
+    //
+    //  pipeline0    dn1   dn2   dn3
+    //  pipeline1    dn0   dn2   dn3
+    //  pipeline2    dn0   dn1   dn3
+    //  pipeline3    dn0   dn1   dn2
+    //
+    // In the above scenario, pipeline0 vs pipeline1 runs through three rounds
+    // of comparisons, (dn3 <-> dn3) -> (dn2 <-> dn2 ) -> (dn1 <-> dn0),
+    // finally comparing dn0 and dn1, and dn0 wins, so pipeline1 is selected.
+    //
+    List<Pipeline> pipelines = new ArrayList<>();
+    for (int i = 0; i < 4; i++) {
+      List<DatanodeDetails> dns = new ArrayList<>();
+      for (int j = 0; j < datanodes.size(); j++) {
+        if (i != j) {
+          dns.add(datanodes.get(j));
+        }
+      }
+      Pipeline pipeline = MockPipeline.createPipeline(dns);
+      MockRatisPipelineProvider.markPipelineHealthy(pipeline);
+      pipelines.add(pipeline);
+    }
+
+    Map<Pipeline, Integer> selectedCount = new HashMap<>();
+    for (Pipeline pipeline : pipelines) {
+      selectedCount.put(pipeline, 0);
+    }
+    for (int i = 0; i < 1000; i++) {
+      // choosePipeline
+      Pipeline pipeline = policy.choosePipeline(pipelines, null);
+      Assertions.assertNotNull(pipeline);

Review Comment:
   Thanks! Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "whbing (via GitHub)" <gi...@apache.org>.
whbing commented on code in PR #5354:
URL: https://github.com/apache/ozone/pull/5354#discussion_r1403287838


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The capacity pipeline choose policy that chooses pipeline
+ * with relatively more remaining datanode space.
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineChoosePolicy.class);
+
+  private final NodeManager nodeManager;
+
+  private final PipelineChoosePolicy healthPolicy;
+
+  public CapacityPipelineChoosePolicy(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
+    healthPolicy = new HealthyPipelineChoosePolicy(nodeManager);
+  }
+
+  @Override
+  public Pipeline choosePipeline(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri);
+    Pipeline pipeline2 = healthPolicy.choosePipeline(pipelineList, pri);
+
+    int result = new CapacityPipelineComparator(this)
+        .compare(pipeline1, pipeline2);
+
+    LOG.debug("Chosen the {} pipeline", result <= 0 ? "first" : "second");
+    return result <= 0 ? pipeline1 : pipeline2;
+  }
+
+  @Override
+  public int choosePipelineIndex(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    List<Pipeline> mutableList = new ArrayList<>(pipelineList);
+    Pipeline pipeline = choosePipeline(mutableList, pri);
+    return pipelineList.indexOf(pipeline);
+  }
+
+  /**
+   * Return a list of SCMNodeMetrics corresponding to the DataNodes in the
+   * pipeline, sorted in descending order based on scm used storage.
+   * @param pipeline pipeline
+   * @return sorted SCMNodeMetrics corresponding the pipeline
+   */
+  private Deque<SCMNodeMetric> getSortedNodeFromPipeline(Pipeline pipeline) {
+    Deque<SCMNodeMetric> sortedNodeStack = new ArrayDeque<>();
+    pipeline.getNodes().stream()
+        .map(nodeManager::getNodeStat)
+        .filter(Objects::nonNull)
+        .sorted()
+        .forEach(sortedNodeStack::push);
+    return sortedNodeStack;
+  }
+
+  static class CapacityPipelineComparator implements Comparator<Pipeline> {
+    private final CapacityPipelineChoosePolicy policy;
+
+    CapacityPipelineComparator(CapacityPipelineChoosePolicy policy) {
+      this.policy = policy;
+    }
+    @Override
+    public int compare(Pipeline p1, Pipeline p2) {
+      if (p1.getId().equals(p2.getId())) {
+        LOG.debug("Compare the same pipeline {}", p1);
+        return 0;
+      }
+      Deque<SCMNodeMetric> sortedNodes1 = policy.getSortedNodeFromPipeline(p1);
+      Deque<SCMNodeMetric> sortedNodes2 = policy.getSortedNodeFromPipeline(p2);
+
+      if (sortedNodes1.isEmpty() || sortedNodes2.isEmpty()) {

Review Comment:
   > How about one `sortedNodes` is empty but another is not empty, should we return the non-empty one?
   
   This only strongly checked when pop from the stack. I'm just trying to keep it simple, Of course you're being more precise. I wonder if it's necessary, and it's not empty usually, right ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on PR #5354:
URL: https://github.com/apache/ozone/pull/5354#issuecomment-1829661100

   @whbing thanks for you update, LGTM +1


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "siddhantsangwan (via GitHub)" <gi...@apache.org>.
siddhantsangwan commented on code in PR #5354:
URL: https://github.com/apache/ozone/pull/5354#discussion_r1399239611


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The capacity pipeline choose policy that chooses pipeline
+ * with relatively more remaining datanode space.
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineChoosePolicy.class);
+
+  private final NodeManager nodeManager;
+
+  private PipelineChoosePolicy healthPolicy;
+
+  public CapacityPipelineChoosePolicy(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
+    healthPolicy = new HealthyPipelineChoosePolicy(nodeManager);
+  }
+
+  @Override
+  public Pipeline choosePipeline(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    Pipeline targetPipeline;
+    Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri);
+    Pipeline pipeline2 = healthPolicy.choosePipeline(pipelineList, pri);
+    if (pipeline1.getId().equals(pipeline2.getId())) {
+      targetPipeline = pipeline1;
+      LOG.debug("Chosen pipeline = {}", targetPipeline);
+    } else {
+      SCMNodeMetric metric1 = getMaxUsageNodeFromPipeline(pipeline1);
+      SCMNodeMetric metric2 = getMaxUsageNodeFromPipeline(pipeline2);
+      if (metric1 == null || metric2 == null) {
+        LOG.warn("Can't get SCMNodeStat from pipeline: {} or {}.",
+            pipeline1, pipeline2);
+        targetPipeline = pipeline1;
+      } else {
+        targetPipeline =
+            !metric1.isGreater(metric2.get()) ? pipeline1 : pipeline2;

Review Comment:
   It's possible that we're checking two pipelines which share a datanode (multi raft), and that datanode is the most used one in both the pipelines. This will result in a tie and we'll choose the first pipeline. I'm wondering if it's better to break the tie by comparing the second most used node in that case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "whbing (via GitHub)" <gi...@apache.org>.
whbing commented on PR #5354:
URL: https://github.com/apache/ozone/pull/5354#issuecomment-1822303915

   debug log as follows:
   ```
   2023-11-22 15:45:10,855 [IPC Server handler 95 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare scmUsed in pipelines, first : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257315761, remaining=93191069696}}], second : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3412590592, remaining=88673959936}}, SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3342024704, remaining=97891262464}}, SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}]
   2023-11-22 15:45:10,856 [IPC Server handler 95 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the first pipeline by compared scmUsed
   2023-11-22 15:45:16,467 [IPC Server handler 0 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare scmUsed in pipelines, first : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257315761, remaining=93191049216}}], second : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3412590592, remaining=88673959936}}, SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3342024704, remaining=97891262464}}, SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3319808000, remaining=93105278976}}]
   2023-11-22 15:45:16,468 [IPC Server handler 0 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the first pipeline by compared scmUsed
   2023-11-22 15:45:22,142 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare scmUsed in pipelines, first : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3412590592, remaining=88673959936}}, SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3342024704, remaining=97891262464}}, SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}], second : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3412590592, remaining=88673959936}}, SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3342024704, remaining=97891262464}}, SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3319808000, remaining=93105278976}}]
   2023-11-22 15:45:22,143 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Secondary compare because the first round is the same
   2023-11-22 15:45:22,143 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the first pipeline by compared scmUsed
   2023-11-22 15:45:27,689 [IPC Server handler 95 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare the same pipeline Pipeline[ Id: f1458feb-0472-4d5f-a490-97029b65dcf5, Nodes: c187d45d-e703-4b6d-a7e7-ec125f5d59f6(zk3/10.96.xx.178)67c72d5b-6fff-4f39-8e9d-ca1ad3628bc3(zk2/10.96.xx.24)43dc44df-f27c-4ade-9651-501fd881a8d6(hadoop3/10.190.xx.5), ReplicationConfig: RATIS/THREE, State:OPEN, leaderId:43dc44df-f27c-4ade-9651-501fd881a8d6, CreationTimestamp2023-11-22T15:43:54.022+08:00[Asia/Chongqing]]
   2023-11-22 15:45:27,690 [IPC Server handler 95 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the first pipeline by compared scmUsed
   2023-11-22 15:45:33,414 [IPC Server handler 0 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare scmUsed in pipelines, first : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257315761, remaining=93191049216}}], second : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3319808000, remaining=93105278976}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257315761, remaining=93191049216}}]
   2023-11-22 15:45:33,415 [IPC Server handler 0 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the first pipeline by compared scmUsed
   2023-11-22 15:45:39,070 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare scmUsed in pipelines, first : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3319808000, remaining=93105278976}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257315761, remaining=93191049216}}], second : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257315761, remaining=93191049216}}]
   2023-11-22 15:45:39,071 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the second pipeline by compared scmUsed
   2023-11-22 15:45:44,801 [IPC Server handler 97 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare scmUsed in pipelines, first : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3319808000, remaining=93105278976}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257317475, remaining=93190995968}}], second : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}}, SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257317475, remaining=93190995968}}]
   2023-11-22 15:45:44,802 [IPC Server handler 97 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the second pipeline by compared scmUsed
   2023-11-22 15:45:51,010 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare scmUsed in pipelines, first : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3412590592, remaining=88673959936}}, SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3342024704, remaining=97895714816}}, SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}], second : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3412590592, remaining=88673959936}}, SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3342024704, remaining=97895714816}}, SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3319808000, remaining=93105278976}}]
   2023-11-22 15:45:51,011 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Secondary compare because the first round is the same
   2023-11-22 15:45:51,011 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the first pipeline by compared scmUsed
   ```
   Format the above log for easy analysis:
   ```
   2023-11-22 15:45:10,855 [IPC Server handler 95 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare scmUsed in pipelines, 
   first :  [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257315761, remaining=93191069696}}], 
   second : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3412590592, remaining=88673959936}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3342024704, remaining=97891262464}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}]
   2023-11-22 15:45:10,856 [IPC Server handler 95 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the first pipeline by compared scmUsed
   
   2023-11-22 15:45:16,467 [IPC Server handler 0 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare scmUsed in pipelines,
   first :  [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257315761, remaining=93191049216}}],
   second : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3412590592, remaining=88673959936}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3342024704, remaining=97891262464}},
             SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3319808000, remaining=93105278976}}]
   2023-11-22 15:45:16,468 [IPC Server handler 0 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the first pipeline by compared scmUsed
   
   2023-11-22 15:45:22,142 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare scmUsed in pipelines,
   first :  [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3412590592, remaining=88673959936}},
             SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3342024704, remaining=97891262464}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}],
   second : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3412590592, remaining=88673959936}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3342024704, remaining=97891262464}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3319808000, remaining=93105278976}}]
   2023-11-22 15:45:22,143 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Secondary compare because the first round is the same
   2023-11-22 15:45:22,143 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the first pipeline by compared scmUsed
   
   2023-11-22 15:45:27,689 [IPC Server handler 95 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare the same pipeline Pipeline[ Id: f1458feb-0472-4d5f-a490-97029b65dcf5, Nodes: c187d45d-e703-4b6d-a7e7-ec125f5d59f6(zk3/10.96.xx.178)67c72d5b-6fff-4f39-8e9d-ca1ad3628bc3(zk2/10.96.xx.24)43dc44df-f27c-4ade-9651-501fd881a8d6(hadoop3/10.190.xx.5), ReplicationConfig: RATIS/THREE, State:OPEN, leaderId:43dc44df-f27c-4ade-9651-501fd881a8d6, CreationTimestamp2023-11-22T15:43:54.022+08:00[Asia/Chongqing]]
   2023-11-22 15:45:27,690 [IPC Server handler 95 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the first pipeline by compared scmUsed
   
   2023-11-22 15:45:33,414 [IPC Server handler 0 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare scmUsed in pipelines,
   first :  [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257315761, remaining=93191049216}}], 
   second : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3319808000, remaining=93105278976}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257315761, remaining=93191049216}}]
   2023-11-22 15:45:33,415 [IPC Server handler 0 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the first pipeline by compared scmUsed
   
   2023-11-22 15:45:39,070 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare scmUsed in pipelines,
   first :  [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3319808000, remaining=93105278976}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257315761, remaining=93191049216}}],
   second : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257315761, remaining=93191049216}}]
   2023-11-22 15:45:39,071 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the second pipeline by compared scmUsed
   
   2023-11-22 15:45:44,801 [IPC Server handler 97 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare scmUsed in pipelines,
   first :  [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3319808000, remaining=93105278976}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}},
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257317475, remaining=93190995968}}],
   second : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257377201, remaining=92930936832}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107313369088, scmUsed=257317475, remaining=93190995968}}]
   2023-11-22 15:45:44,802 [IPC Server handler 97 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the second pipeline by compared scmUsed
   
   2023-11-22 15:45:51,010 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Compare scmUsed in pipelines,
   first :  [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3412590592, remaining=88673959936}},
             SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3342024704, remaining=97895714816}},
             SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=623083520, remaining=86085607424}}],
   second : [SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3412590592, remaining=88673959936}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3342024704, remaining=97895714816}}, 
             SCMNodeMetric{SCMNodeStat{capacity=107374182400, scmUsed=3319808000, remaining=93105278976}}]
   2023-11-22 15:45:51,011 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Secondary compare because the first round is the same
   2023-11-22 15:45:51,011 [IPC Server handler 14 on default port 9863] DEBUG org.apache.hadoop.hdds.scm.PipelineChoosePolicy: Chosen the first pipeline by compared scmUsed
   ```
   Meet expectations.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "adoroszlai (via GitHub)" <gi...@apache.org>.
adoroszlai merged PR #5354:
URL: https://github.com/apache/ozone/pull/5354


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "whbing (via GitHub)" <gi...@apache.org>.
whbing commented on code in PR #5354:
URL: https://github.com/apache/ozone/pull/5354#discussion_r1403265846


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The capacity pipeline choose policy that chooses pipeline
+ * with relatively more remaining datanode space.
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineChoosePolicy.class);
+
+  private final NodeManager nodeManager;
+
+  private final PipelineChoosePolicy healthPolicy;
+
+  public CapacityPipelineChoosePolicy(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
+    healthPolicy = new HealthyPipelineChoosePolicy(nodeManager);
+  }
+
+  @Override
+  public Pipeline choosePipeline(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri);

Review Comment:
   > In some Cluster, There's maybe close hundred pipelines. We just compare two `Pipeline` in here. Does this make the probability of the largest (in capacity) Pipeline being selected low?
   > 
   > Perhaps a possible solution is to add a configuration that determines how many Pipelines are compared at a time, which takes the value [0, 1]
   > 
   > * When it is 0, only one Pipeline is selected at a time, which is basically equivalent to the `RandomPipelineChoosePolicy`.
   > * When 1, it compares all Pipelines, and strictly chooses the largest Pipeline in the whole world.
   > 
   > PS: But even if this feature needs to be implemented, I think it can be done in another PR, and when this PR is merged, the current solution will work in a small cluster.
   
   @xichen01 Thanks for review ! About the logic of selection, there are links to this original papers in [HDFS-11564](https://issues.apache.org/jira/browse/HDFS-11564). The algorithms of choosing 2 random nodes and then placing the container on the lower utilization node is discussed in great depth in this survey paper.
   https://pdfs.semanticscholar.org/3597/66cb47572028eb70c797115e987ff203e83f.pdf
   In addition, `SCMContainerPlacementCapacity#chooseNode`  also uses this algorithm. So, I wonder if it is not necessary to find the pipeline with minimum storage every time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "whbing (via GitHub)" <gi...@apache.org>.
whbing commented on code in PR #5354:
URL: https://github.com/apache/ozone/pull/5354#discussion_r1410464513


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The capacity pipeline choose policy that chooses pipeline
+ * with relatively more remaining datanode space.
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {

Review Comment:
   > This is also random pipeline choice, can rename this as RandomCapacityPipelineChoosePolicy. Also in comments, can give brief description, choose 2 random pipeline and return pipeline with lower usages.
   
   @sumitagrawl  Already added description and test. PTAL if you have time, thanks! 
   About className, I noticed that, even when using this algorithm, the naming conventions are relatively concise. Is it necessary to make any changes?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "adoroszlai (via GitHub)" <gi...@apache.org>.
adoroszlai commented on code in PR #5354:
URL: https://github.com/apache/ozone/pull/5354#discussion_r1455996846


##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestCapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for the capacity pipeline choose policy.
+ */
+public class TestCapacityPipelineChoosePolicy {
+  @Test
+  public void choosePipeline() throws Exception {
+
+    // given 4 datanode
+    List<DatanodeDetails> datanodes = new ArrayList<>();
+    for (int i = 0; i < 4; i++) {
+      datanodes.add(MockDatanodeDetails.randomDatanodeDetails());
+    }
+    //          dn0   dn1   dn2   dn3
+    // used       0   10    20    30
+    NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
+    when(mockNodeManager.getNodeStat(datanodes.get(0)))
+        .thenReturn(new SCMNodeMetric(100L, 0L, 100L));
+    when(mockNodeManager.getNodeStat(datanodes.get(1)))
+        .thenReturn(new SCMNodeMetric(100L, 10L, 90L));
+    when(mockNodeManager.getNodeStat(datanodes.get(2)))
+        .thenReturn(new SCMNodeMetric(100L, 20L, 80L));
+    when(mockNodeManager.getNodeStat(datanodes.get(3)))
+        .thenReturn(new SCMNodeMetric(100L, 30L, 70L));
+
+    CapacityPipelineChoosePolicy policy =
+        new CapacityPipelineChoosePolicy(mockNodeManager);
+
+    // generate 4 pipelines, and every pipeline has 3 datanodes
+    //
+    //  pipeline0    dn1   dn2   dn3
+    //  pipeline1    dn0   dn2   dn3
+    //  pipeline2    dn0   dn1   dn3
+    //  pipeline3    dn0   dn1   dn2
+    //
+    // In the above scenario, pipeline0 vs pipeline1 runs through three rounds
+    // of comparisons, (dn3 <-> dn3) -> (dn2 <-> dn2 ) -> (dn1 <-> dn0),
+    // finally comparing dn0 and dn1, and dn0 wins, so pipeline1 is selected.
+    //
+    List<Pipeline> pipelines = new ArrayList<>();
+    for (int i = 0; i < 4; i++) {
+      List<DatanodeDetails> dns = new ArrayList<>();
+      for (int j = 0; j < datanodes.size(); j++) {
+        if (i != j) {
+          dns.add(datanodes.get(j));
+        }
+      }
+      Pipeline pipeline = MockPipeline.createPipeline(dns);
+      MockRatisPipelineProvider.markPipelineHealthy(pipeline);
+      pipelines.add(pipeline);
+    }
+
+    Map<Pipeline, Integer> selectedCount = new HashMap<>();
+    for (Pipeline pipeline : pipelines) {
+      selectedCount.put(pipeline, 0);
+    }
+    for (int i = 0; i < 1000; i++) {
+      // choosePipeline
+      Pipeline pipeline = policy.choosePipeline(pipelines, null);
+      Assertions.assertNotNull(pipeline);

Review Comment:
   nit: please add static imports
   
   ```suggestion
         assertNotNull(pipeline);
   ```



##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestCapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for the capacity pipeline choose policy.
+ */
+public class TestCapacityPipelineChoosePolicy {
+  @Test
+  public void choosePipeline() throws Exception {
+
+    // given 4 datanode
+    List<DatanodeDetails> datanodes = new ArrayList<>();
+    for (int i = 0; i < 4; i++) {
+      datanodes.add(MockDatanodeDetails.randomDatanodeDetails());
+    }
+    //          dn0   dn1   dn2   dn3
+    // used       0   10    20    30
+    NodeManager mockNodeManager = Mockito.mock(NodeManager.class);
+    when(mockNodeManager.getNodeStat(datanodes.get(0)))
+        .thenReturn(new SCMNodeMetric(100L, 0L, 100L));
+    when(mockNodeManager.getNodeStat(datanodes.get(1)))
+        .thenReturn(new SCMNodeMetric(100L, 10L, 90L));
+    when(mockNodeManager.getNodeStat(datanodes.get(2)))
+        .thenReturn(new SCMNodeMetric(100L, 20L, 80L));
+    when(mockNodeManager.getNodeStat(datanodes.get(3)))
+        .thenReturn(new SCMNodeMetric(100L, 30L, 70L));
+
+    CapacityPipelineChoosePolicy policy =
+        new CapacityPipelineChoosePolicy(mockNodeManager);
+
+    // generate 4 pipelines, and every pipeline has 3 datanodes
+    //
+    //  pipeline0    dn1   dn2   dn3
+    //  pipeline1    dn0   dn2   dn3
+    //  pipeline2    dn0   dn1   dn3
+    //  pipeline3    dn0   dn1   dn2
+    //
+    // In the above scenario, pipeline0 vs pipeline1 runs through three rounds
+    // of comparisons, (dn3 <-> dn3) -> (dn2 <-> dn2 ) -> (dn1 <-> dn0),
+    // finally comparing dn0 and dn1, and dn0 wins, so pipeline1 is selected.
+    //
+    List<Pipeline> pipelines = new ArrayList<>();
+    for (int i = 0; i < 4; i++) {
+      List<DatanodeDetails> dns = new ArrayList<>();
+      for (int j = 0; j < datanodes.size(); j++) {
+        if (i != j) {
+          dns.add(datanodes.get(j));
+        }
+      }
+      Pipeline pipeline = MockPipeline.createPipeline(dns);
+      MockRatisPipelineProvider.markPipelineHealthy(pipeline);
+      pipelines.add(pipeline);
+    }
+
+    Map<Pipeline, Integer> selectedCount = new HashMap<>();
+    for (Pipeline pipeline : pipelines) {
+      selectedCount.put(pipeline, 0);
+    }
+    for (int i = 0; i < 1000; i++) {
+      // choosePipeline
+      Pipeline pipeline = policy.choosePipeline(pipelines, null);
+      Assertions.assertNotNull(pipeline);
+      selectedCount.put(pipeline, selectedCount.get(pipeline) + 1);
+    }
+
+    // The selected count from most to least should be :
+    // pipeline3 > pipeline2 > pipeline1 > pipeline0
+    for (int i = 0; i < pipelines.size(); i++) {
+      System.out.println("pipeline" + i + " selected count: "
+          + selectedCount.get(pipelines.get(i)));
+    }
+    Assertions.assertTrue(selectedCount.get(pipelines.get(3))
+        > selectedCount.get(pipelines.get(2)));

Review Comment:
   nit: please use AssertJ for better failure message instead of using std. output.
   
   ```suggestion
       assertThat(selectedCount.get(pipelines.get(3)))
           .isGreaterThan(selectedCount.get(pipelines.get(2)));
   ```
   
   + `import static org.assertj.core.api.Assertions.assertThat`
   + same for the other two assertions below



##########
hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/TestCapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.MockPipeline;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.when;
+
+/**
+ * Test for the capacity pipeline choose policy.
+ */
+public class TestCapacityPipelineChoosePolicy {
+  @Test
+  public void choosePipeline() throws Exception {
+
+    // given 4 datanode
+    List<DatanodeDetails> datanodes = new ArrayList<>();
+    for (int i = 0; i < 4; i++) {
+      datanodes.add(MockDatanodeDetails.randomDatanodeDetails());
+    }
+    //          dn0   dn1   dn2   dn3
+    // used       0   10    20    30
+    NodeManager mockNodeManager = Mockito.mock(NodeManager.class);

Review Comment:
   nit: please add static import
   
   ```suggestion
       NodeManager mockNodeManager = mock(NodeManager.class);
   ```



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/RandomPipelineChoosePolicy.java:
##########
@@ -30,6 +31,9 @@
  */
 public class RandomPipelineChoosePolicy implements PipelineChoosePolicy {
 
+  public RandomPipelineChoosePolicy(NodeManager nodeManager) {
+  }

Review Comment:
   In addition to compatibility, falling back to the no-arg constructor would also let us avoid adding this.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/PipelineChoosePolicyFactory.java:
##########
@@ -64,17 +65,18 @@ public static PipelineChoosePolicy getPolicy(
         LOG.error("Met an exception while create pipeline choose policy "
             + "for the given class {}. Fallback to the default pipeline "
             + " choose policy {}", policyName, defaultPolicy, e);
-        return createPipelineChoosePolicyFromClass(defaultPolicy);
+        return createPipelineChoosePolicyFromClass(nodeManager, defaultPolicy);
       }
       throw e;
     }
   }
 
   private static PipelineChoosePolicy createPipelineChoosePolicyFromClass(
+      final NodeManager nodeManager,
       Class<? extends PipelineChoosePolicy> policyClass) throws SCMException {
     Constructor<? extends PipelineChoosePolicy> constructor;
     try {
-      constructor = policyClass.getDeclaredConstructor();
+      constructor = policyClass.getDeclaredConstructor(NodeManager.class);

Review Comment:
   I think we should make the change backwards compatible, by using no-args constructor if this one fails.  Although it's not very likely, there may be custom implementations using the existing contract.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/HealthyPipelineChoosePolicy.java:
##########
@@ -30,7 +31,11 @@
  */
 public class HealthyPipelineChoosePolicy implements PipelineChoosePolicy {
 
-  private PipelineChoosePolicy randomPolicy = new RandomPipelineChoosePolicy();
+  private PipelineChoosePolicy randomPolicy;
+
+  public HealthyPipelineChoosePolicy(NodeManager nodeManager) {
+    randomPolicy = new RandomPipelineChoosePolicy(nodeManager);
+  }

Review Comment:
   In addition to compatibility, falling back to the no-arg constructor would also let us avoid adding this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] sodonnel commented on pull request #5354: HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space

Posted by "sodonnel (via GitHub)" <gi...@apache.org>.
sodonnel commented on PR #5354:
URL: https://github.com/apache/ozone/pull/5354#issuecomment-1733162487

   Please add some description to the PR about how this new policy would work, why it is needed etc.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #5354:
URL: https://github.com/apache/ozone/pull/5354#discussion_r1407628398


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The capacity pipeline choose policy that chooses pipeline
+ * with relatively more remaining datanode space.
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineChoosePolicy.class);
+
+  private final NodeManager nodeManager;
+
+  private final PipelineChoosePolicy healthPolicy;
+
+  public CapacityPipelineChoosePolicy(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
+    healthPolicy = new HealthyPipelineChoosePolicy(nodeManager);
+  }
+
+  @Override
+  public Pipeline choosePipeline(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri);
+    Pipeline pipeline2 = healthPolicy.choosePipeline(pipelineList, pri);
+
+    int result = new CapacityPipelineComparator(this)
+        .compare(pipeline1, pipeline2);
+
+    LOG.debug("Chosen the {} pipeline", result <= 0 ? "first" : "second");
+    return result <= 0 ? pipeline1 : pipeline2;
+  }
+
+  @Override
+  public int choosePipelineIndex(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    List<Pipeline> mutableList = new ArrayList<>(pipelineList);
+    Pipeline pipeline = choosePipeline(mutableList, pri);
+    return pipelineList.indexOf(pipeline);
+  }
+
+  /**
+   * Return a list of SCMNodeMetrics corresponding to the DataNodes in the
+   * pipeline, sorted in descending order based on scm used storage.
+   * @param pipeline pipeline
+   * @return sorted SCMNodeMetrics corresponding the pipeline
+   */
+  private Deque<SCMNodeMetric> getSortedNodeFromPipeline(Pipeline pipeline) {
+    Deque<SCMNodeMetric> sortedNodeStack = new ArrayDeque<>();
+    pipeline.getNodes().stream()
+        .map(nodeManager::getNodeStat)
+        .filter(Objects::nonNull)
+        .sorted()
+        .forEach(sortedNodeStack::push);
+    return sortedNodeStack;
+  }
+
+  static class CapacityPipelineComparator implements Comparator<Pipeline> {
+    private final CapacityPipelineChoosePolicy policy;
+
+    CapacityPipelineComparator(CapacityPipelineChoosePolicy policy) {
+      this.policy = policy;
+    }
+    @Override
+    public int compare(Pipeline p1, Pipeline p2) {
+      if (p1.getId().equals(p2.getId())) {
+        LOG.debug("Compare the same pipeline {}", p1);
+        return 0;
+      }
+      Deque<SCMNodeMetric> sortedNodes1 = policy.getSortedNodeFromPipeline(p1);
+      Deque<SCMNodeMetric> sortedNodes2 = policy.getSortedNodeFromPipeline(p2);
+
+      if (sortedNodes1.isEmpty() || sortedNodes2.isEmpty()) {

Review Comment:
   OK, I think this is also acceptable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "whbing (via GitHub)" <gi...@apache.org>.
whbing commented on PR #5354:
URL: https://github.com/apache/ozone/pull/5354#issuecomment-1824309126

   ci passed in my branch https://github.com/whbing/ozone/actions/runs/6956936286.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #5354:
URL: https://github.com/apache/ozone/pull/5354#discussion_r1407627084


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The capacity pipeline choose policy that chooses pipeline
+ * with relatively more remaining datanode space.
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineChoosePolicy.class);
+
+  private final NodeManager nodeManager;
+
+  private final PipelineChoosePolicy healthPolicy;
+
+  public CapacityPipelineChoosePolicy(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
+    healthPolicy = new HealthyPipelineChoosePolicy(nodeManager);
+  }
+
+  @Override
+  public Pipeline choosePipeline(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri);

Review Comment:
   >@xichen01 Thanks for review ! About the logic of selection, there are links to this original papers in [HDFS-11564](https://issues.apache.org/jira/browse/HDFS-11564). The algorithms of choosing 2 random nodes and then placing the container on the lower utilization node is discussed in great depth in this survey paper.
   https://pdfs.semanticscholar.org/3597/66cb47572028eb70c797115e987ff203e83f.pdf
   In addition, SCMContainerPlacementCapacity#chooseNode also uses this algorithm. So, I wonder if it is not necessary to find the pipeline with minimum storage every time?
   
   @whbing Understood. For a fairly balanced cluster, such as a new one, this strategy can work very well, providing similar loads to all DataNodes. 
   However, for a significantly unbalanced cluster, like when adding new nodes, this strategy might be limited, especially in larger clusters. 
   But for the latter case (adding new nodes), we can also balance it using a balancer



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "sumitagrawl (via GitHub)" <gi...@apache.org>.
sumitagrawl commented on code in PR #5354:
URL: https://github.com/apache/ozone/pull/5354#discussion_r1406033791


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The capacity pipeline choose policy that chooses pipeline
+ * with relatively more remaining datanode space.
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {

Review Comment:
   This is also random pipeline choice, can rename this as RandomCapacityPipelineChoosePolicy.
   Also in comments, can give brief description, choose 2 random pipeline and return pipeline with lower usages.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The capacity pipeline choose policy that chooses pipeline
+ * with relatively more remaining datanode space.
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineChoosePolicy.class);
+
+  private final NodeManager nodeManager;
+
+  private final PipelineChoosePolicy healthPolicy;
+
+  public CapacityPipelineChoosePolicy(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
+    healthPolicy = new HealthyPipelineChoosePolicy(nodeManager);
+  }
+
+  @Override
+  public Pipeline choosePipeline(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri);
+    Pipeline pipeline2 = healthPolicy.choosePipeline(pipelineList, pri);
+
+    int result = new CapacityPipelineComparator(this)
+        .compare(pipeline1, pipeline2);
+
+    LOG.debug("Chosen the {} pipeline", result <= 0 ? "first" : "second");
+    return result <= 0 ? pipeline1 : pipeline2;
+  }
+
+  @Override
+  public int choosePipelineIndex(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    List<Pipeline> mutableList = new ArrayList<>(pipelineList);
+    Pipeline pipeline = choosePipeline(mutableList, pri);
+    return pipelineList.indexOf(pipeline);
+  }
+
+  /**
+   * Return a list of SCMNodeMetrics corresponding to the DataNodes in the
+   * pipeline, sorted in descending order based on scm used storage.
+   * @param pipeline pipeline
+   * @return sorted SCMNodeMetrics corresponding the pipeline
+   */
+  private Deque<SCMNodeMetric> getSortedNodeFromPipeline(Pipeline pipeline) {
+    Deque<SCMNodeMetric> sortedNodeStack = new ArrayDeque<>();
+    pipeline.getNodes().stream()
+        .map(nodeManager::getNodeStat)
+        .filter(Objects::nonNull)
+        .sorted()
+        .forEach(sortedNodeStack::push);
+    return sortedNodeStack;
+  }
+
+  static class CapacityPipelineComparator implements Comparator<Pipeline> {
+    private final CapacityPipelineChoosePolicy policy;
+
+    CapacityPipelineComparator(CapacityPipelineChoosePolicy policy) {
+      this.policy = policy;
+    }
+    @Override
+    public int compare(Pipeline p1, Pipeline p2) {
+      if (p1.getId().equals(p2.getId())) {
+        LOG.debug("Compare the same pipeline {}", p1);
+        return 0;
+      }
+      Deque<SCMNodeMetric> sortedNodes1 = policy.getSortedNodeFromPipeline(p1);
+      Deque<SCMNodeMetric> sortedNodes2 = policy.getSortedNodeFromPipeline(p2);
+
+      if (sortedNodes1.isEmpty() || sortedNodes2.isEmpty()) {
+        LOG.warn("Cannot obtain SCMNodeMetric in pipeline {} or {}", p1, p2);
+        return 0;
+      }
+      LOG.debug("Compare scmUsed weight in pipelines, first : {}, second : {}",
+          sortedNodes1, sortedNodes2);
+      // Compare the scmUsed of the first node in the two sorted node stacks
+      int result = sortedNodes1.pop().compareTo(sortedNodes2.pop());
+
+      if (result == 0 && !sortedNodes1.isEmpty() && !sortedNodes2.isEmpty()) {

Review Comment:
   Only 2 nodes are compared, but not all the nodes in pipeline, any reason?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The capacity pipeline choose policy that chooses pipeline
+ * with relatively more remaining datanode space.
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineChoosePolicy.class);
+
+  private final NodeManager nodeManager;
+
+  private final PipelineChoosePolicy healthPolicy;
+
+  public CapacityPipelineChoosePolicy(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
+    healthPolicy = new HealthyPipelineChoosePolicy(nodeManager);
+  }
+
+  @Override
+  public Pipeline choosePipeline(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri);

Review Comment:
   Do we have any test result for this algo comparing with random healthy node policy? just to see effectivness of algo.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "whbing (via GitHub)" <gi...@apache.org>.
whbing commented on code in PR #5354:
URL: https://github.com/apache/ozone/pull/5354#discussion_r1401647527


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The capacity pipeline choose policy that chooses pipeline
+ * with relatively more remaining datanode space.
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineChoosePolicy.class);
+
+  private final NodeManager nodeManager;
+
+  private PipelineChoosePolicy healthPolicy;
+
+  public CapacityPipelineChoosePolicy(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
+    healthPolicy = new HealthyPipelineChoosePolicy(nodeManager);
+  }
+
+  @Override
+  public Pipeline choosePipeline(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    Pipeline targetPipeline;
+    Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri);
+    Pipeline pipeline2 = healthPolicy.choosePipeline(pipelineList, pri);
+    if (pipeline1.getId().equals(pipeline2.getId())) {
+      targetPipeline = pipeline1;
+      LOG.debug("Chosen pipeline = {}", targetPipeline);
+    } else {
+      SCMNodeMetric metric1 = getMaxUsageNodeFromPipeline(pipeline1);
+      SCMNodeMetric metric2 = getMaxUsageNodeFromPipeline(pipeline2);
+      if (metric1 == null || metric2 == null) {
+        LOG.warn("Can't get SCMNodeStat from pipeline: {} or {}.",
+            pipeline1, pipeline2);
+        targetPipeline = pipeline1;
+      } else {
+        targetPipeline =
+            !metric1.isGreater(metric2.get()) ? pipeline1 : pipeline2;

Review Comment:
   I rebased master branch and add a commit "add second compare logic".  I tested in test env and  debug log is printed as follows.
   @siddhantsangwan  If you have the time, PTAL again. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "xichen01 (via GitHub)" <gi...@apache.org>.
xichen01 commented on code in PR #5354:
URL: https://github.com/apache/ozone/pull/5354#discussion_r1403211230


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The capacity pipeline choose policy that chooses pipeline
+ * with relatively more remaining datanode space.
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineChoosePolicy.class);
+
+  private final NodeManager nodeManager;
+
+  private final PipelineChoosePolicy healthPolicy;
+
+  public CapacityPipelineChoosePolicy(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
+    healthPolicy = new HealthyPipelineChoosePolicy(nodeManager);
+  }
+
+  @Override
+  public Pipeline choosePipeline(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri);
+    Pipeline pipeline2 = healthPolicy.choosePipeline(pipelineList, pri);
+
+    int result = new CapacityPipelineComparator(this)
+        .compare(pipeline1, pipeline2);
+
+    LOG.debug("Chosen the {} pipeline", result <= 0 ? "first" : "second");
+    return result <= 0 ? pipeline1 : pipeline2;
+  }
+
+  @Override
+  public int choosePipelineIndex(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    List<Pipeline> mutableList = new ArrayList<>(pipelineList);
+    Pipeline pipeline = choosePipeline(mutableList, pri);
+    return pipelineList.indexOf(pipeline);
+  }
+
+  /**
+   * Return a list of SCMNodeMetrics corresponding to the DataNodes in the
+   * pipeline, sorted in descending order based on scm used storage.
+   * @param pipeline pipeline
+   * @return sorted SCMNodeMetrics corresponding the pipeline
+   */
+  private Deque<SCMNodeMetric> getSortedNodeFromPipeline(Pipeline pipeline) {
+    Deque<SCMNodeMetric> sortedNodeStack = new ArrayDeque<>();
+    pipeline.getNodes().stream()
+        .map(nodeManager::getNodeStat)
+        .filter(Objects::nonNull)
+        .sorted()
+        .forEach(sortedNodeStack::push);
+    return sortedNodeStack;
+  }
+
+  static class CapacityPipelineComparator implements Comparator<Pipeline> {
+    private final CapacityPipelineChoosePolicy policy;
+
+    CapacityPipelineComparator(CapacityPipelineChoosePolicy policy) {
+      this.policy = policy;
+    }
+    @Override
+    public int compare(Pipeline p1, Pipeline p2) {
+      if (p1.getId().equals(p2.getId())) {
+        LOG.debug("Compare the same pipeline {}", p1);
+        return 0;
+      }
+      Deque<SCMNodeMetric> sortedNodes1 = policy.getSortedNodeFromPipeline(p1);
+      Deque<SCMNodeMetric> sortedNodes2 = policy.getSortedNodeFromPipeline(p2);
+
+      if (sortedNodes1.isEmpty() || sortedNodes2.isEmpty()) {

Review Comment:
   How about one `sortedNodes` is empty but another is not empty, should we return the non-empty one?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/choose/algorithms/CapacityPipelineChoosePolicy.java:
##########
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.choose.algorithms;
+
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeMetric;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Deque;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * The capacity pipeline choose policy that chooses pipeline
+ * with relatively more remaining datanode space.
+ */
+public class CapacityPipelineChoosePolicy implements PipelineChoosePolicy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineChoosePolicy.class);
+
+  private final NodeManager nodeManager;
+
+  private final PipelineChoosePolicy healthPolicy;
+
+  public CapacityPipelineChoosePolicy(NodeManager nodeManager) {
+    this.nodeManager = nodeManager;
+    healthPolicy = new HealthyPipelineChoosePolicy(nodeManager);
+  }
+
+  @Override
+  public Pipeline choosePipeline(List<Pipeline> pipelineList,
+      PipelineRequestInformation pri) {
+    Pipeline pipeline1 = healthPolicy.choosePipeline(pipelineList, pri);

Review Comment:
   In some Cluster, There's maybe close hundred pipelines. We just compare two `Pipeline` in here.
   Does this make the probability of the largest (in capacity) Pipeline being selected low?
   
   Perhaps a possible solution is to add a configuration that determines how many Pipelines are compared at a time, which takes the value [0, 1]
   - When it is 0, only one Pipeline is selected at a time, which is basically equivalent to the `RandomPipelineChoosePolicy`.
   - When 1, it compares all Pipelines, and strictly chooses the largest Pipeline in the whole world.
   
   
   PS: But even if this feature needs to be implemented, I think it can be done in another PR, and when this PR is merged, the current solution will work in a small cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


[GitHub] [ozone] whbing commented on pull request #5354: HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space

Posted by "whbing (via GitHub)" <gi...@apache.org>.
whbing commented on PR #5354:
URL: https://github.com/apache/ozone/pull/5354#issuecomment-1733627777

   > Please add some description to the PR about how this new policy would work, why it is needed etc.
   
   Added at the beginning of this page


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org


Re: [PR] HDDS-9345. Add CapacityPipelineChoosePolicy considering datanode storage space [ozone]

Posted by "whbing (via GitHub)" <gi...@apache.org>.
whbing commented on PR #5354:
URL: https://github.com/apache/ozone/pull/5354#issuecomment-1833483496

   Run test, got selected result like:
    ```
   pipeline0 selected count: 62
   pipeline1 selected count: 205
   pipeline2 selected count: 308
   pipeline3 selected count: 425
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@ozone.apache.org
For additional commands, e-mail: issues-help@ozone.apache.org