You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2020/10/19 03:44:22 UTC

[hadoop-ozone] branch master updated: HDDS-2922. Balance ratis leader distribution in datanodes (#1371)

This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fab5f2  HDDS-2922. Balance ratis leader distribution in datanodes (#1371)
8fab5f2 is described below

commit 8fab5f21c5e9bcfb1ce9b55a4179b8c958a53b25
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Mon Oct 19 11:44:10 2020 +0800

    HDDS-2922. Balance ratis leader distribution in datanodes (#1371)
---
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  |  19 ++
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |   3 +
 .../hadoop/hdds/scm/exceptions/SCMException.java   |   3 +-
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |  42 +++-
 .../common/src/main/resources/ozone-default.xml    |  18 ++
 .../CreatePipelineCommandHandler.java              |  10 +-
 .../common/transport/server/XceiverServerSpi.java  |  10 +-
 .../transport/server/ratis/XceiverServerRatis.java |  27 ++-
 .../protocol/commands/CreatePipelineCommand.java   |  46 ++++-
 .../TestCreatePipelineCommandHandler.java          |   7 +-
 .../interface-client/src/main/proto/hdds.proto     |   1 +
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |   1 +
 .../src/main/proto/ScmServerProtocol.proto         |   1 +
 .../hdds/scm/pipeline/PipelineStateManager.java    |   3 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |  29 ++-
 .../algorithms/DefaultLeaderChoosePolicy.java      |  42 ++++
 .../choose/algorithms/LeaderChoosePolicy.java      |  55 ++++++
 .../algorithms/LeaderChoosePolicyFactory.java      |  75 +++++++
 .../algorithms/MinLeaderCountChoosePolicy.java     |  91 +++++++++
 .../leader/choose/algorithms/package-info.java     |  19 ++
 .../choose/algorithms/TestLeaderChoosePolicy.java  |  74 +++++++
 .../hdds/scm/pipeline/TestLeaderChoosePolicy.java  | 216 +++++++++++++++++++++
 .../TestRatisPipelineCreateAndDestroy.java         |   2 +-
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  |   4 +-
 .../ozone/om/TestOzoneManagerRocksDBLogging.java   |   2 +-
 25 files changed, 778 insertions(+), 22 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index 8325f09..324774d 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.ratis;
 
 import java.io.IOException;
 import java.security.cert.X509Certificate;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -106,6 +107,11 @@ public final class RatisHelper {
     return new RaftPeer(toRaftPeerId(id), toRaftPeerAddressString(id));
   }
 
+  public static RaftPeer toRaftPeer(DatanodeDetails id, int priority) {
+    return new RaftPeer(
+        toRaftPeerId(id), toRaftPeerAddressString(id), priority);
+  }
+
   private static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
     return toRaftPeers(pipeline.getNodes());
   }
@@ -126,6 +132,19 @@ public final class RatisHelper {
   }
 
   public static RaftGroup newRaftGroup(RaftGroupId groupId,
+      List<DatanodeDetails> peers, List<Integer> priorityList) {
+    assert peers.size() == priorityList.size();
+
+    final List<RaftPeer> newPeers = new ArrayList<>();
+    for (int i = 0; i < peers.size(); i++) {
+      RaftPeer peer = RatisHelper.toRaftPeer(peers.get(i), priorityList.get(i));
+      newPeers.add(peer);
+    }
+    return peers.isEmpty() ? RaftGroup.valueOf(groupId, Collections.emptyList())
+        : RaftGroup.valueOf(groupId, newPeers);
+  }
+
+  public static RaftGroup newRaftGroup(RaftGroupId groupId,
       Collection<DatanodeDetails> peers) {
     final List<RaftPeer> newPeers = peers.stream()
         .map(RatisHelper::toRaftPeer)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 540d2c0..7b01e07 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -321,6 +321,9 @@ public final class ScmConfigKeys {
   public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT =
       "ozone.scm.pipeline.allocated.timeout";
 
+  public static final String OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY =
+      "ozone.scm.pipeline.leader-choose.policy";
+
   public static final String OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT =
       "5m";
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
index 0146eae..48a8e05 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -123,6 +123,7 @@ public class SCMException extends IOException {
     FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY,
     FAILED_TO_ALLOCATE_ENOUGH_BLOCKS,
     INTERNAL_ERROR,
-    FAILED_TO_INIT_PIPELINE_CHOOSE_POLICY
+    FAILED_TO_INIT_PIPELINE_CHOOSE_POLICY,
+    FAILED_TO_INIT_LEADER_CHOOSE_POLICY
   }
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index d5c1024..a4787ef 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -61,6 +61,8 @@ public final class Pipeline {
   private UUID leaderId;
   // Timestamp for pipeline upon creation
   private Instant creationTimestamp;
+  // suggested leader id with high priority
+  private final UUID suggestedLeaderId;
 
   /**
    * The immutable properties of pipeline object is used in
@@ -69,13 +71,14 @@ public final class Pipeline {
    */
   private Pipeline(PipelineID id, ReplicationType type,
       ReplicationFactor factor, PipelineState state,
-      Map<DatanodeDetails, Long> nodeStatus) {
+      Map<DatanodeDetails, Long> nodeStatus, UUID suggestedLeaderId) {
     this.id = id;
     this.type = type;
     this.factor = factor;
     this.state = state;
     this.nodeStatus = nodeStatus;
     this.creationTimestamp = Instant.now();
+    this.suggestedLeaderId = suggestedLeaderId;
   }
 
   /**
@@ -124,6 +127,16 @@ public final class Pipeline {
   }
 
   /**
+   * Return the suggested leaderId which has a high priority among DNs of the
+   * pipeline.
+   *
+   * @return Suggested LeaderId
+   */
+  public UUID getSuggestedLeaderId() {
+    return suggestedLeaderId;
+  }
+
+  /**
    * Set the creation timestamp. Only for protobuf now.
    *
    * @param creationTimestamp
@@ -278,6 +291,14 @@ public final class Pipeline {
       builder.setLeaderID128(uuid128);
     }
 
+    if (suggestedLeaderId != null) {
+      HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder()
+          .setMostSigBits(suggestedLeaderId.getMostSignificantBits())
+          .setLeastSigBits(suggestedLeaderId.getLeastSignificantBits())
+          .build();
+      builder.setSuggestedLeaderID(uuid128);
+    }
+
     // To save the message size on wire, only transfer the node order based on
     // network topology
     List<DatanodeDetails> nodes = nodesInOrder.get();
@@ -315,12 +336,20 @@ public final class Pipeline {
       leaderId = UUID.fromString(pipeline.getLeaderID());
     }
 
+    UUID suggestedLeaderId = null;
+    if (pipeline.hasSuggestedLeaderID()) {
+      HddsProtos.UUID uuid = pipeline.getSuggestedLeaderID();
+      suggestedLeaderId =
+          new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits());
+    }
+
     return new Builder().setId(PipelineID.getFromProtobuf(pipeline.getId()))
         .setFactor(pipeline.getFactor())
         .setType(pipeline.getType())
         .setState(PipelineState.fromProtobuf(pipeline.getState()))
         .setNodes(nodes)
         .setLeaderId(leaderId)
+        .setSuggestedLeaderId(suggestedLeaderId)
         .setNodesInOrder(pipeline.getMemberOrdersList())
         .setCreateTimestamp(pipeline.getCreationTimeStamp())
         .build();
@@ -392,6 +421,7 @@ public final class Pipeline {
     private List<DatanodeDetails> nodesInOrder = null;
     private UUID leaderId = null;
     private Instant creationTimestamp = null;
+    private UUID suggestedLeaderId = null;
 
     public Builder() {}
 
@@ -404,6 +434,7 @@ public final class Pipeline {
       this.nodesInOrder = pipeline.nodesInOrder.get();
       this.leaderId = pipeline.getLeaderId();
       this.creationTimestamp = pipeline.getCreationTimestamp();
+      this.suggestedLeaderId = pipeline.getSuggestedLeaderId();
     }
 
     public Builder setId(PipelineID id1) {
@@ -447,13 +478,19 @@ public final class Pipeline {
       return this;
     }
 
+    public Builder setSuggestedLeaderId(UUID uuid) {
+      this.suggestedLeaderId = uuid;
+      return this;
+    }
+
     public Pipeline build() {
       Preconditions.checkNotNull(id);
       Preconditions.checkNotNull(type);
       Preconditions.checkNotNull(factor);
       Preconditions.checkNotNull(state);
       Preconditions.checkNotNull(nodeStatus);
-      Pipeline pipeline = new Pipeline(id, type, factor, state, nodeStatus);
+      Pipeline pipeline =
+          new Pipeline(id, type, factor, state, nodeStatus, suggestedLeaderId);
       pipeline.setLeaderId(leaderId);
       // overwrite with original creationTimestamp
       if (creationTimestamp != null) {
@@ -484,6 +521,7 @@ public final class Pipeline {
         // This branch is for pipeline clone
         pipeline.setNodesInOrder(nodesInOrder);
       }
+
       return pipeline;
     }
   }
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 026c0a8..4853978 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -858,6 +858,24 @@
     </description>
   </property>
   <property>
+    <name>ozone.scm.pipeline.leader-choose.policy</name>
+    <value>
+      org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.DefaultLeaderChoosePolicy
+    </value>
+    <tag>OZONE, SCM, PIPELINE</tag>
+    <description>
+      The policy used for choosing desired leader for pipeline creation.
+      There are two policies supporting now: DefaultLeaderChoosePolicy, MinLeaderCountChoosePolicy.
+      org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.DefaultLeaderChoosePolicy
+      implements a policy that choose leader without depending on priority.
+      org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.MinLeaderCountChoosePolicy
+      implements a policy that choose leader which has the minimum exist leader count.
+      In the future, we need to add policies which consider:
+      1. resource, the datanode with the most abundant cpu and memory can be made the leader
+      2. topology, the datanode nearest to the client can be made the leader
+    </description>
+  </property>
+  <property>
     <name>ozone.scm.container.size</name>
     <value>5GB</value>
     <tag>OZONE, PERFORMANCE, MANAGEMENT</tag>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
index e435f7b..8f41fe9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -17,8 +17,8 @@
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
 import java.io.IOException;
-import java.util.Collection;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -82,18 +82,20 @@ public class CreatePipelineCommandHandler implements CommandHandler {
     final CreatePipelineCommandProto createCommand =
         ((CreatePipelineCommand)command).getProto();
     final HddsProtos.PipelineID pipelineID = createCommand.getPipelineID();
-    final Collection<DatanodeDetails> peers =
+    final List<DatanodeDetails> peers =
         createCommand.getDatanodeList().stream()
             .map(DatanodeDetails::getFromProtoBuf)
             .collect(Collectors.toList());
+    final List<Integer> priorityList = createCommand.getPriorityList();
 
     try {
       XceiverServerSpi server = ozoneContainer.getWriteChannel();
       if (!server.isExist(pipelineID)) {
         final RaftGroupId groupId = RaftGroupId.valueOf(
             PipelineID.getFromProtobuf(pipelineID).getId());
-        final RaftGroup group = RatisHelper.newRaftGroup(groupId, peers);
-        server.addGroup(pipelineID, peers);
+        final RaftGroup group =
+            RatisHelper.newRaftGroup(groupId, peers, priorityList);
+        server.addGroup(pipelineID, peers, priorityList);
         peers.stream().filter(
             d -> !d.getUuid().equals(dn.getUuid()))
             .forEach(d -> {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
index d8dfefd..4805612 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.MetadataStorageReportProto;
 import java.io.IOException;
-import java.util.Collection;
 import java.util.List;
 
 /** A server endpoint that acts as the communication layer for Ozone
@@ -68,9 +67,16 @@ public interface XceiverServerSpi {
    * Join a new pipeline.
    */
   default void addGroup(HddsProtos.PipelineID pipelineId,
-      Collection<DatanodeDetails> peers) throws IOException {
+      List<DatanodeDetails> peers) throws IOException {
   }
 
+  /**
+   * Join a new pipeline with priority.
+   */
+  default void addGroup(HddsProtos.PipelineID pipelineId,
+      List<DatanodeDetails> peers,
+      List<Integer> priorityList) throws IOException {
+  }
 
   /**
    * Exit a pipeline.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 573e681..0b0756c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -115,6 +116,9 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   private static final Logger LOG = LoggerFactory
       .getLogger(XceiverServerRatis.class);
   private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
+  private static final List<Integer> DEFAULT_PRIORITY_LIST =
+      new ArrayList<>(
+          Collections.nCopies(HddsProtos.ReplicationFactor.THREE_VALUE, 0));
 
   private static long nextCallId() {
     return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
@@ -711,10 +715,23 @@ public final class XceiverServerRatis implements XceiverServerSpi {
 
   @Override
   public void addGroup(HddsProtos.PipelineID pipelineId,
-      Collection<DatanodeDetails> peers) throws IOException {
+      List<DatanodeDetails> peers) throws IOException {
+    if (peers.size() == getDefaultPriorityList().size()) {
+      addGroup(pipelineId, peers, getDefaultPriorityList());
+    } else {
+      addGroup(pipelineId, peers,
+          new ArrayList<>(Collections.nCopies(peers.size(), 0)));
+    }
+  }
+
+  @Override
+  public void addGroup(HddsProtos.PipelineID pipelineId,
+      List<DatanodeDetails> peers,
+      List<Integer> priorityList) throws IOException {
     final PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineId);
     final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId());
-    final RaftGroup group = RatisHelper.newRaftGroup(groupId, peers);
+    final RaftGroup group =
+        RatisHelper.newRaftGroup(groupId, peers, priorityList);
     GroupManagementRequest request = GroupManagementRequest.newAdd(
         clientId, server.getId(), nextCallId(), group);
 
@@ -864,4 +881,10 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     return ImmutableList.copyOf(executors);
   }
 
+  /**
+   * @return list of default priority
+   */
+  public static List<Integer> getDefaultPriorityList() {
+    return DEFAULT_PRIORITY_LIST;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
index 9e22cbc..6fdb4ce 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
@@ -25,7 +25,10 @@ import org.apache.hadoop.hdds.protocol.proto.
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverServerRatis;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -35,10 +38,14 @@ import java.util.stream.Collectors;
 public class CreatePipelineCommand
     extends SCMCommand<CreatePipelineCommandProto> {
 
+  private static final Integer HIGH_PRIORITY = 1;
+  private static final Integer LOW_PRIORITY = 0;
+
   private final PipelineID pipelineID;
   private final ReplicationFactor factor;
   private final ReplicationType type;
   private final List<DatanodeDetails> nodelist;
+  private final List<Integer> priorityList;
 
   public CreatePipelineCommand(final PipelineID pipelineID,
       final ReplicationType type, final ReplicationFactor factor,
@@ -48,16 +55,49 @@ public class CreatePipelineCommand
     this.factor = factor;
     this.type = type;
     this.nodelist = datanodeList;
+    if (datanodeList.size() ==
+        XceiverServerRatis.getDefaultPriorityList().size()) {
+      this.priorityList = XceiverServerRatis.getDefaultPriorityList();
+    } else {
+      this.priorityList =
+          new ArrayList<>(Collections.nCopies(datanodeList.size(), 0));
+    }
+  }
+
+  public CreatePipelineCommand(final PipelineID pipelineID,
+      final ReplicationType type, final ReplicationFactor factor,
+      final List<DatanodeDetails> datanodeList,
+      final DatanodeDetails suggestedLeader) {
+    super();
+    this.pipelineID = pipelineID;
+    this.factor = factor;
+    this.type = type;
+    this.nodelist = datanodeList;
+    this.priorityList = new ArrayList<>();
+    initPriorityList(datanodeList, suggestedLeader);
+  }
+
+  private void initPriorityList(
+      List<DatanodeDetails> dns, DatanodeDetails suggestedLeader) {
+    for (DatanodeDetails dn : dns) {
+      if (dn.equals(suggestedLeader)) {
+        priorityList.add(HIGH_PRIORITY);
+      } else {
+        priorityList.add(LOW_PRIORITY);
+      }
+    }
   }
 
   public CreatePipelineCommand(long cmdId, final PipelineID pipelineID,
       final ReplicationType type, final ReplicationFactor factor,
-      final List<DatanodeDetails> datanodeList) {
+      final List<DatanodeDetails> datanodeList,
+      final List<Integer> priorityList) {
     super(cmdId);
     this.pipelineID = pipelineID;
     this.factor = factor;
     this.type = type;
     this.nodelist = datanodeList;
+    this.priorityList = priorityList;
   }
 
   /**
@@ -80,6 +120,7 @@ public class CreatePipelineCommand
         .addAllDatanode(nodelist.stream()
             .map(DatanodeDetails::getProtoBufMessage)
             .collect(Collectors.toList()))
+        .addAllPriority(priorityList)
         .build();
   }
 
@@ -91,7 +132,8 @@ public class CreatePipelineCommand
         createPipelineProto.getType(), createPipelineProto.getFactor(),
         createPipelineProto.getDatanodeList().stream()
             .map(DatanodeDetails::getFromProtoBuf)
-            .collect(Collectors.toList()));
+            .collect(Collectors.toList()),
+        createPipelineProto.getPriorityList());
   }
 
   public PipelineID getPipelineID() {
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
index 8ee6ac7..ede0b94 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
@@ -48,7 +48,9 @@ import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -113,8 +115,11 @@ public class TestCreatePipelineCommandHandler {
     commandHandler.handle(command, ozoneContainer, stateContext,
         connectionManager);
 
+    List<Integer> priorityList =
+        new ArrayList<>(Collections.nCopies(datanodes.size(), 0));
+
     Mockito.verify(writeChanel, Mockito.times(1))
-        .addGroup(pipelineID.getProtobuf(), datanodes);
+        .addGroup(pipelineID.getProtobuf(), datanodes, priorityList);
 
     Mockito.verify(raftClient, Mockito.times(2))
         .groupAdd(Mockito.any(RaftGroup.class), Mockito.any(RaftPeerId.class));
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index 0c9b261..b43a74c 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -100,6 +100,7 @@ message Pipeline {
     optional string leaderID = 6;
     repeated uint32 memberOrders = 7;
     optional uint64 creationTimeStamp = 8;
+    optional UUID suggestedLeaderID = 9;
     // TODO(runzhiwang): when leaderID is gone, specify 6 as the index of leaderID128
     optional UUID leaderID128 = 100;
 }
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index 1dc4bcd..4f610ff 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -380,6 +380,7 @@ message CreatePipelineCommandProto {
   required ReplicationFactor factor = 3;
   repeated DatanodeDetailsProto datanode = 4;
   required int64 cmdId = 5;
+  repeated int32 priority = 6;
 }
 
 /**
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
index 682d4d9..7d59bd7 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerProtocol.proto
@@ -115,6 +115,7 @@ enum Status {
   FAILED_TO_ALLOCATE_ENOUGH_BLOCKS = 27;
   INTERNAL_ERROR = 29;
   FAILED_TO_INIT_PIPELINE_CHOOSE_POLICY = 30;
+  FAILED_TO_INIT_LEADER_CHOOSE_POLICY = 31;
 }
 
 /**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index bb56a03..8bc5bd5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -59,7 +59,8 @@ public class PipelineStateManager {
     pipelineStateMap.addContainerToPipeline(pipelineId, containerID);
   }
 
-  Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException {
+  public Pipeline getPipeline(PipelineID pipelineID)
+      throws PipelineNotFoundException {
     return pipelineStateMap.getPipeline(pipelineID);
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index e39f141..830db18 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -31,11 +31,14 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
+import org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.LeaderChoosePolicy;
+import org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms.LeaderChoosePolicyFactory;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 
+import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,8 +55,10 @@ public class RatisPipelineProvider extends PipelineProvider {
   private final PipelinePlacementPolicy placementPolicy;
   private int pipelineNumberLimit;
   private int maxPipelinePerDatanode;
+  private final LeaderChoosePolicy leaderChoosePolicy;
 
-  RatisPipelineProvider(NodeManager nodeManager,
+  @VisibleForTesting
+  public RatisPipelineProvider(NodeManager nodeManager,
       PipelineStateManager stateManager, ConfigurationSource conf,
       EventPublisher eventPublisher) {
     super(nodeManager, stateManager);
@@ -67,6 +72,12 @@ public class RatisPipelineProvider extends PipelineProvider {
     String dnLimit = conf.get(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT);
     this.maxPipelinePerDatanode = dnLimit == null ? 0 :
         Integer.parseInt(dnLimit);
+    try {
+      leaderChoosePolicy = LeaderChoosePolicyFactory
+          .getPolicy(conf, nodeManager, stateManager);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
   }
 
   private boolean exceedPipelineNumberLimit(ReplicationFactor factor) {
@@ -98,8 +109,14 @@ public class RatisPipelineProvider extends PipelineProvider {
     return false;
   }
 
+  @VisibleForTesting
+  public LeaderChoosePolicy getLeaderChoosePolicy() {
+    return leaderChoosePolicy;
+  }
+
   @Override
-  public Pipeline create(ReplicationFactor factor) throws IOException {
+  public synchronized Pipeline create(ReplicationFactor factor)
+      throws IOException {
     if (exceedPipelineNumberLimit(factor)) {
       throw new SCMException("Ratis pipeline number meets the limit: " +
           pipelineNumberLimit + " factor : " +
@@ -121,16 +138,22 @@ public class RatisPipelineProvider extends PipelineProvider {
       throw new IllegalStateException("Unknown factor: " + factor.name());
     }
 
+    DatanodeDetails suggestedLeader = leaderChoosePolicy.chooseLeader(dns);
+
     Pipeline pipeline = Pipeline.newBuilder()
         .setId(PipelineID.randomId())
         .setState(PipelineState.ALLOCATED)
         .setType(ReplicationType.RATIS)
         .setFactor(factor)
         .setNodes(dns)
+        .setSuggestedLeaderId(
+            suggestedLeader != null ? suggestedLeader.getUuid() : null)
         .build();
 
     // Send command to datanodes to create pipeline
-    final CreatePipelineCommand createCommand =
+    final CreatePipelineCommand createCommand = suggestedLeader != null ?
+        new CreatePipelineCommand(pipeline.getId(), pipeline.getType(),
+            factor, dns, suggestedLeader) :
         new CreatePipelineCommand(pipeline.getId(), pipeline.getType(),
             factor, dns);
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java
new file mode 100644
index 0000000..415cf10
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
+
+import java.util.List;
+
+/**
+ * The default leader choose policy.
+ * Do not choose leader here, so that all nodes have the same priority
+ * and ratis elects leader without depending on priority.
+ */
+public class DefaultLeaderChoosePolicy extends LeaderChoosePolicy {
+
+  public DefaultLeaderChoosePolicy(
+      NodeManager nodeManager, PipelineStateManager pipelineStateManager) {
+    super(nodeManager, pipelineStateManager);
+  }
+
+  @Override
+  public DatanodeDetails chooseLeader(List<DatanodeDetails> dns) {
+    return null;
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java
new file mode 100644
index 0000000..04c155b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
+
+import java.util.List;
+
+/**
+ * A {@link LeaderChoosePolicy} support choosing leader from datanode list.
+ */
+public abstract class LeaderChoosePolicy {
+
+  private final NodeManager nodeManager;
+  private final PipelineStateManager pipelineStateManager;
+
+  public LeaderChoosePolicy(
+      NodeManager nodeManager, PipelineStateManager pipelineStateManager) {
+    this.nodeManager = nodeManager;
+    this.pipelineStateManager = pipelineStateManager;
+  }
+
+  /**
+   * Given an initial list of datanodes, return one of the datanodes.
+   *
+   * @param dns list of datanodes.
+   * @return one of the datanodes.
+   */
+  public abstract DatanodeDetails chooseLeader(List<DatanodeDetails> dns);
+
+  protected NodeManager getNodeManager() {
+    return nodeManager;
+  }
+
+  protected PipelineStateManager getPipelineStateManager() {
+    return pipelineStateManager;
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java
new file mode 100644
index 0000000..8e1a0ff
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms;
+
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Constructor;
+
+/**
+ * A factory to create leader choose policy instance based on configuration
+ * property {@link ScmConfigKeys#OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY}.
+ */
+public final class LeaderChoosePolicyFactory {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LeaderChoosePolicyFactory.class);
+
+  private static final Class<? extends LeaderChoosePolicy>
+      OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY_DEFAULT =
+      MinLeaderCountChoosePolicy.class;
+
+  private LeaderChoosePolicyFactory() {
+  }
+
+
+  public static LeaderChoosePolicy getPolicy(
+      ConfigurationSource conf, final NodeManager nodeManager,
+      final PipelineStateManager pipelineStateManager) throws SCMException {
+    final Class<? extends LeaderChoosePolicy> policyClass = conf
+        .getClass(ScmConfigKeys.OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY,
+            OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY_DEFAULT,
+            LeaderChoosePolicy.class);
+    Constructor<? extends LeaderChoosePolicy> constructor;
+    try {
+      constructor = policyClass.getDeclaredConstructor(NodeManager.class,
+          PipelineStateManager.class);
+      LOG.info("Create leader choose policy of type {}",
+          policyClass.getCanonicalName());
+    } catch (NoSuchMethodException e) {
+      String msg = "Failed to find constructor(NodeManager, " +
+          "PipelineStateManager) for class " +
+          policyClass.getCanonicalName();
+      LOG.error(msg);
+      throw new SCMException(msg,
+          SCMException.ResultCodes.FAILED_TO_INIT_LEADER_CHOOSE_POLICY);
+    }
+
+    try {
+      return constructor.newInstance(nodeManager, pipelineStateManager);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to instantiate class " +
+          policyClass.getCanonicalName() + " for " + e.getMessage());
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java
new file mode 100644
index 0000000..d4068b9
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The minimum leader count choose policy that chooses leader
+ * which has the minimum exist leader count.
+ */
+public class MinLeaderCountChoosePolicy extends LeaderChoosePolicy {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(MinLeaderCountChoosePolicy.class);
+
+  public MinLeaderCountChoosePolicy(
+      NodeManager nodeManager, PipelineStateManager pipelineStateManager) {
+    super(nodeManager, pipelineStateManager);
+  }
+
+  @Override
+  public DatanodeDetails chooseLeader(List<DatanodeDetails> dns) {
+    Map<DatanodeDetails, Integer> suggestedLeaderCount =
+        getSuggestedLeaderCount(
+            dns, getNodeManager(), getPipelineStateManager());
+    int minLeaderCount = Integer.MAX_VALUE;
+    DatanodeDetails suggestedLeader = null;
+
+    for (Map.Entry<DatanodeDetails, Integer> entry :
+        suggestedLeaderCount.entrySet()) {
+      if (entry.getValue() < minLeaderCount) {
+        minLeaderCount = entry.getValue();
+        suggestedLeader = entry.getKey();
+      }
+    }
+
+    return suggestedLeader;
+  }
+
+  private Map<DatanodeDetails, Integer> getSuggestedLeaderCount(
+      List<DatanodeDetails> dns, NodeManager nodeManager,
+      PipelineStateManager pipelineStateManager) {
+    Map<DatanodeDetails, Integer> suggestedLeaderCount = new HashMap<>();
+    for (DatanodeDetails dn : dns) {
+      suggestedLeaderCount.put(dn, 0);
+
+      Set<PipelineID> pipelineIDSet = nodeManager.getPipelines(dn);
+      for (PipelineID pipelineID : pipelineIDSet) {
+        try {
+          Pipeline pipeline = pipelineStateManager.getPipeline(pipelineID);
+          if (!pipeline.isClosed()
+              && dn.getUuid().equals(pipeline.getSuggestedLeaderId())) {
+            suggestedLeaderCount.put(dn, suggestedLeaderCount.get(dn) + 1);
+          }
+        } catch (PipelineNotFoundException e) {
+          LOG.debug("Pipeline not found in pipeline state manager : {}",
+              pipelineID, e);
+        }
+      }
+    }
+
+    return suggestedLeaderCount;
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/package-info.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/package-info.java
new file mode 100644
index 0000000..e29369a
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms;
+// Various leader choosing algorithms.
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java
new file mode 100644
index 0000000..53905e7
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.ScmConfig;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
+import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineProvider;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Unit tests for {@link LeaderChoosePolicy}.
+ */
+public class TestLeaderChoosePolicy {
+  private OzoneConfiguration conf;
+
+  private ScmConfig scmConfig;
+
+  @Before
+  public void setup() {
+    //initialize network topology instance
+    conf = new OzoneConfiguration();
+    scmConfig = conf.getObject(ScmConfig.class);
+  }
+
+  @Test
+  public void testDefaultPolicy() {
+    RatisPipelineProvider ratisPipelineProvider = new RatisPipelineProvider(
+        mock(NodeManager.class),
+        mock(PipelineStateManager.class),
+        conf,
+        mock(EventPublisher.class));
+    Assert.assertSame(
+        ratisPipelineProvider.getLeaderChoosePolicy().getClass(),
+        DefaultLeaderChoosePolicy.class);
+  }
+
+  @Test(expected = RuntimeException.class)
+  public void testClassNotImplemented() {
+    // set a class not implemented
+    conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY,
+        "org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms" +
+            ".HelloWorld");
+    new RatisPipelineProvider(
+        mock(NodeManager.class),
+        mock(PipelineStateManager.class),
+        conf,
+        mock(EventPublisher.class));
+
+    // expecting exception
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestLeaderChoosePolicy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestLeaderChoosePolicy.java
new file mode 100644
index 0000000..ecf1c2f
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestLeaderChoosePolicy.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.TimeoutException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
+
+/**
+ * Tests for LeaderChoosePolicy.
+ */
+@Ignore
+public class TestLeaderChoosePolicy {
+
+  private static MiniOzoneCluster cluster;
+  private OzoneConfiguration conf = new OzoneConfiguration();
+  private static PipelineManager pipelineManager;
+
+  public void init(int numDatanodes, int datanodePipelineLimit)
+      throws Exception {
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+        GenericTestUtils.getRandomizedTempPath());
+    conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, datanodePipelineLimit);
+
+    cluster = MiniOzoneCluster.newBuilder(conf)
+            .setNumDatanodes(numDatanodes)
+            .setTotalPipelineNumLimit(numDatanodes + numDatanodes/3)
+            .setHbInterval(2000)
+            .setHbProcessorInterval(1000)
+            .build();
+    cluster.waitForClusterToBeReady();
+    StorageContainerManager scm = cluster.getStorageContainerManager();
+    pipelineManager = scm.getPipelineManager();
+  }
+
+  @After
+  public void cleanup() {
+    cluster.shutdown();
+  }
+
+  private void checkLeaderBalance(int dnNum, int leaderNumOfEachDn)
+      throws Exception {
+    List<Pipeline> pipelines = pipelineManager
+        .getPipelines(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN);
+
+    for (Pipeline pipeline : pipelines) {
+      LambdaTestUtils.await(30000, 500, () ->
+          pipeline.getLeaderId().equals(pipeline.getSuggestedLeaderId()));
+    }
+
+    Map<UUID, Integer> leaderCount = new HashMap<>();
+    for (Pipeline pipeline : pipelines) {
+      UUID leader = pipeline.getLeaderId();
+      if (!leaderCount.containsKey(leader)) {
+        leaderCount.put(leader, 0);
+      }
+
+      leaderCount.put(leader, leaderCount.get(leader) + 1);
+    }
+
+    Assert.assertTrue(leaderCount.size() == dnNum);
+    for (UUID key : leaderCount.keySet()) {
+      Assert.assertTrue(leaderCount.get(key) == leaderNumOfEachDn);
+    }
+  }
+
+  @Test(timeout = 360000)
+  public void testRestoreSuggestedLeader() throws Exception {
+    conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false);
+    conf.set(OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY,
+        "org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms" +
+            ".MinLeaderCountChoosePolicy");
+    int dnNum = 3;
+    int dnPipelineLimit = 3;
+    int leaderNumOfEachDn = dnPipelineLimit / dnNum;
+    int pipelineNum = 3;
+
+    init(dnNum, dnPipelineLimit);
+    // make sure two pipelines are created
+    waitForPipelines(pipelineNum);
+    // No Factor ONE pipeline is auto created.
+    Assert.assertEquals(0, pipelineManager.getPipelines(
+        HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.ONE).size());
+
+    // pipelineNum pipelines in 3 datanodes,
+    // each datanode has leaderNumOfEachDn leaders after balance
+    checkLeaderBalance(dnNum, leaderNumOfEachDn);
+    List<Pipeline> pipelinesBeforeRestart =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipelines();
+
+    cluster.restartStorageContainerManager(true);
+
+    checkLeaderBalance(dnNum, leaderNumOfEachDn);
+    List<Pipeline> pipelinesAfterRestart =
+        cluster.getStorageContainerManager().getPipelineManager()
+            .getPipelines();
+
+    Assert.assertEquals(
+        pipelinesBeforeRestart.size(), pipelinesAfterRestart.size());
+
+    for (Pipeline p : pipelinesBeforeRestart) {
+      boolean equal = false;
+      for (Pipeline q : pipelinesAfterRestart) {
+        if (p.getId().equals(q.getId())
+            && p.getSuggestedLeaderId().equals(q.getSuggestedLeaderId())) {
+          equal = true;
+        }
+      }
+
+      Assert.assertTrue(equal);
+    }
+  }
+
+  @Test(timeout = 360000)
+  public void testMinLeaderCountChoosePolicy() throws Exception {
+    conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false);
+    conf.set(OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY,
+        "org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms" +
+            ".MinLeaderCountChoosePolicy");
+    int dnNum = 3;
+    int dnPipelineLimit = 3;
+    int leaderNumOfEachDn = dnPipelineLimit / dnNum;
+    int pipelineNum = 3;
+
+    init(dnNum, dnPipelineLimit);
+    // make sure pipelines are created
+    waitForPipelines(pipelineNum);
+    // No Factor ONE pipeline is auto created.
+    Assert.assertEquals(0, pipelineManager.getPipelines(
+        HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.ONE).size());
+
+    // pipelineNum pipelines in 3 datanodes,
+    // each datanode has leaderNumOfEachDn leaders after balance
+    checkLeaderBalance(dnNum, leaderNumOfEachDn);
+
+    Random r = new Random(0);
+    for (int i = 0; i < 10; i++) {
+      // destroy some pipelines, wait new pipelines created,
+      // then check leader balance
+
+      List<Pipeline> pipelines = pipelineManager
+          .getPipelines(HddsProtos.ReplicationType.RATIS,
+              HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN);
+
+      int destroyNum = r.nextInt(pipelines.size());
+      for (int k = 0; k <= destroyNum; k++) {
+        pipelineManager.finalizeAndDestroyPipeline(pipelines.get(k), false);
+      }
+
+      waitForPipelines(pipelineNum);
+
+      checkLeaderBalance(dnNum, leaderNumOfEachDn);
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testDefaultLeaderChoosePolicy() throws Exception {
+    conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false);
+    conf.set(OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY,
+        "org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms" +
+            ".DefaultLeaderChoosePolicy");
+    int dnNum = 3;
+    int dnPipelineLimit = 3;
+    int pipelineNum = 3;
+
+    init(dnNum, dnPipelineLimit);
+    // make sure pipelines are created
+    waitForPipelines(pipelineNum);
+  }
+
+  private void waitForPipelines(int numPipelines)
+      throws TimeoutException, InterruptedException {
+    GenericTestUtils.waitFor(() -> pipelineManager
+        .getPipelines(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
+        .size() >= numPipelines, 100, 60000);
+  }
+}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
index bd677db..6236900 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
@@ -169,6 +169,6 @@ public class TestRatisPipelineCreateAndDestroy {
     GenericTestUtils.waitFor(() -> pipelineManager
         .getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
-        .size() >= numPipelines, 100, 40000);
+        .size() >= numPipelines, 100, 60000);
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index 9a9f0c7..46e3d67 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -102,7 +102,7 @@ public class TestMiniOzoneCluster {
     FileUtils.deleteQuietly(READ_TMP);
   }
 
-  @Test(timeout = 30000)
+  @Test(timeout = 60000)
   public void testStartMultipleDatanodes() throws Exception {
     final int numberOfNodes = 3;
     cluster = MiniOzoneCluster.newBuilder(conf)
@@ -290,7 +290,7 @@ public class TestMiniOzoneCluster {
    * Test that a DN can register with SCM even if it was started before the SCM.
    * @throws Exception
    */
-  @Test (timeout = 60000)
+  @Test (timeout = 100000)
   public void testDNstartAfterSCM() throws Exception {
     // Start a cluster with 3 DN
     cluster = MiniOzoneCluster.newBuilder(conf)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRocksDBLogging.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRocksDBLogging.java
index 57c7061..5ddde8a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRocksDBLogging.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerRocksDBLogging.java
@@ -42,7 +42,7 @@ public class TestOzoneManagerRocksDBLogging {
   private RocksDBConfiguration dbConf;
 
   @Rule
-  public Timeout timeout = new Timeout(60000);
+  public Timeout timeout = new Timeout(100000);
 
   private static GenericTestUtils.LogCapturer logCapturer =
       GenericTestUtils.LogCapturer.captureLogs(DBStoreBuilder.ROCKS_DB_LOGGER);


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org