You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by el...@apache.org on 2019/11/20 09:02:07 UTC

[hadoop-ozone] branch HDDS-2531 created (now 9d21c88)

This is an automated email from the ASF dual-hosted git repository.

elek pushed a change to branch HDDS-2531
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git.


      at 9d21c88  shell script fix from @adoroszlai

This branch includes the following new commits:

     new 3e7e212  Revert "Revert "HDDS-2034. Async RATIS pipeline creation and destroy through heartbeat commands (#29)""
     new 9d21c88  shell script fix from @adoroszlai

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org


[hadoop-ozone] 02/02: shell script fix from @adoroszlai

Posted by el...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-2531
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 9d21c88ee596d5e87a8cfeaf664b4cbf3c8976e2
Author: Márton Elek <el...@apache.org>
AuthorDate: Wed Nov 20 10:01:39 2019 +0100

    shell script fix from @adoroszlai
---
 hadoop-ozone/dist/src/main/compose/testlib.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/hadoop-ozone/dist/src/main/compose/testlib.sh b/hadoop-ozone/dist/src/main/compose/testlib.sh
index 684f2f5..49274c0 100755
--- a/hadoop-ozone/dist/src/main/compose/testlib.sh
+++ b/hadoop-ozone/dist/src/main/compose/testlib.sh
@@ -94,7 +94,7 @@ wait_for_safemode_exit(){
      #This line checks the safemode status in scm
      local command="ozone scmcli safemode status"
      if [[ "${SECURITY_ENABLED}" == 'true' ]]; then
-         status=`docker-compose -f "${compose_file}" exec -T scm bash -c "kinit -k HTTP/scm@EXAMPLE.COM -t /etc/security/keytabs/HTTP.keytab && $command'"`
+         status=`docker-compose -f "${compose_file}" exec -T scm bash -c "kinit -k HTTP/scm@EXAMPLE.COM -t /etc/security/keytabs/HTTP.keytab && $command"`
      else
          status=`docker-compose -f "${compose_file}" exec -T scm bash -c "$command"`
      fi


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org


[hadoop-ozone] 01/02: Revert "Revert "HDDS-2034. Async RATIS pipeline creation and destroy through heartbeat commands (#29)""

Posted by el...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-2531
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 3e7e21244c9a0bf249c976f20ae4e559acd1013d
Author: Márton Elek <el...@apache.org>
AuthorDate: Wed Nov 20 10:00:11 2019 +0100

    Revert "Revert "HDDS-2034. Async RATIS pipeline creation and destroy through heartbeat commands (#29)""
    
    This reverts commit dcfe5f34d79473def14f25812d77c6c685b92e58.
---
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |  12 +-
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |   5 +
 .../org/apache/hadoop/hdds/utils/Scheduler.java    |   7 +-
 .../common/src/main/resources/ozone-default.xml    |  30 ++-
 .../common/statemachine/DatanodeStateMachine.java  |   6 +
 .../CloseContainerCommandHandler.java              |  11 +-
 .../ClosePipelineCommandHandler.java               | 120 ++++++++++++
 .../commandhandler/CommandHandler.java             |   2 +-
 .../CreatePipelineCommandHandler.java              | 135 +++++++++++++
 .../states/endpoint/HeartbeatEndpointTask.java     |  22 +++
 .../common/transport/server/XceiverServerSpi.java  |  18 ++
 .../transport/server/ratis/XceiverServerRatis.java |  36 ++++
 .../protocol/commands/ClosePipelineCommand.java    |  73 +++++++
 .../protocol/commands/CreatePipelineCommand.java   | 100 ++++++++++
 .../proto/StorageContainerDatanodeProtocol.proto   |  23 +++
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |   3 +
 .../hdds/scm/container/ContainerStateManager.java  |   1 +
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |  12 +-
 .../scm/pipeline/BackgroundPipelineCreator.java    |   9 +-
 .../hadoop/hdds/scm/pipeline/PipelineFactory.java  |  13 +-
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |  15 +-
 .../hadoop/hdds/scm/pipeline/PipelineProvider.java |   2 +
 .../hdds/scm/pipeline/PipelineReportHandler.java   |  54 +++---
 .../hdds/scm/pipeline/PipelineStateManager.java    |   4 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   | 137 +++++---------
 .../hdds/scm/pipeline/RatisPipelineUtils.java      | 103 ----------
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  82 ++++++--
 .../hdds/scm/pipeline/SCMPipelineMetrics.java      |  10 +
 .../hdds/scm/pipeline/SimplePipelineProvider.java  |   5 +
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |  83 ++++----
 .../safemode/OneReplicaPipelineSafeModeRule.java   |  65 +++----
 .../hdds/scm/safemode/SCMSafeModeManager.java      |  16 +-
 .../hadoop/hdds/scm/safemode/SafeModeHandler.java  |   5 +-
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java |   1 +
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |  20 ++
 .../hdds/scm/server/StorageContainerManager.java   |   3 +-
 .../java/org/apache/hadoop/hdds/scm/TestUtils.java |  12 ++
 .../hadoop/hdds/scm/block/TestBlockManager.java    |  44 ++++-
 .../container/TestCloseContainerEventHandler.java  |  11 +-
 .../scm/container/TestSCMContainerManager.java     |   4 +-
 .../hdds/scm/node/TestContainerPlacement.java      |   2 +-
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |  11 +-
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |   9 +-
 .../scm/pipeline/MockRatisPipelineProvider.java    |  10 +-
 .../scm/pipeline/TestRatisPipelineProvider.java    |  26 ++-
 .../safemode/TestHealthyPipelineSafeModeRule.java  |  44 ++---
 .../TestOneReplicaPipelineSafeModeRule.java        |  36 +---
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |  50 ++---
 hadoop-ozone/dist/src/main/compose/testlib.sh      |  35 ++++
 .../TestContainerStateManagerIntegration.java      |   6 +-
 .../metrics/TestSCMContainerManagerMetrics.java    |   3 +
 .../hdds/scm/pipeline/TestPipelineClose.java       |   4 +-
 .../scm/pipeline/TestRatisPipelineProvider.java    | 210 +++++++++++++++++++++
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |  93 +++++++--
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |   2 +-
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  43 ++---
 .../hadoop/ozone/TestContainerOperations.java      |   2 +-
 .../TestContainerStateMachineIdempotency.java      |   2 +-
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  |   6 +-
 .../hadoop/ozone/TestStorageContainerManager.java  |  18 +-
 .../apache/hadoop/ozone/client/rpc/TestBCSID.java  |   3 +
 .../client/rpc/TestContainerStateMachine.java      |   3 +
 .../rpc/TestContainerStateMachineFailures.java     |   9 +-
 .../ozone/container/TestContainerReplication.java  |   2 +-
 .../commandhandler/TestBlockDeletion.java          |   6 +-
 .../commandhandler/TestCloseContainerHandler.java  |   2 +
 .../commandhandler/TestDeleteContainerHandler.java |   2 +
 .../hadoop/ozone/dn/scrubber/TestDataScrubber.java |   2 +
 .../org/apache/hadoop/ozone/om/TestKeyPurging.java |   2 +-
 .../apache/hadoop/ozone/om/TestScmSafeMode.java    |   2 +-
 .../hadoop/ozone/scm/TestContainerSmallFile.java   |   2 +-
 .../scm/TestGetCommittedBlockLengthAndPutKey.java  |   2 +-
 .../org/apache/hadoop/ozone/scm/TestSCMMXBean.java |   2 +-
 .../hadoop/ozone/scm/node/TestSCMNodeMetrics.java  |   4 +-
 .../hadoop/fs/ozone/TestOzoneFsRenameDir.java      |   2 +-
 .../hadoop/ozone/fsck/TestContainerMapper.java     |   2 +-
 76 files changed, 1403 insertions(+), 570 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 99972ae..5e161b3 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -81,7 +81,12 @@ public final class HddsConfigKeys {
   public static final String HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK =
       "hdds.scm.safemode.pipeline-availability.check";
   public static final boolean
-      HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = false;
+      HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = true;
+
+  public static final String HDDS_SCM_SAFEMODE_PIPELINE_CREATION =
+      "hdds.scm.safemode.pipeline.creation";
+  public static final boolean
+      HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT = true;
 
   // % of containers which should have at least one reported replica
   // before SCM comes out of safe mode.
@@ -89,13 +94,16 @@ public final class HddsConfigKeys {
       "hdds.scm.safemode.threshold.pct";
   public static final double HDDS_SCM_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.99;
 
-
   // percentage of healthy pipelines, where all 3 datanodes are reported in the
   // pipeline.
   public static final String HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT =
       "hdds.scm.safemode.healthy.pipelie.pct";
   public static final double
       HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT = 0.10;
+  // number of healthy RATIS pipeline(ONE or THREE factor)
+  public static final String HDDS_SCM_SAFEMODE_MIN_PIPELINE =
+      "hdds.scm.safemode.min.pipeline";
+  public static final int HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT = 1;
 
   public static final String HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT =
       "hdds.scm.safemode.atleast.one.node.reported.pipeline.pct";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 47ec453..51598a7 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -155,6 +155,11 @@ public final class Pipeline {
     return state == PipelineState.OPEN;
   }
 
+  public boolean isAllocationTimeout() {
+    //TODO: define a system property to control the timeout value
+    return false;
+  }
+
   public void setNodesInOrder(List<DatanodeDetails> nodes) {
     nodesInOrder.set(nodes);
   }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
index 9edc104..f5e55c1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/utils/Scheduler.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -72,9 +73,9 @@ public class Scheduler {
     }, delay, timeUnit);
   }
 
-  public void scheduleWithFixedDelay(Runnable runnable, long initialDelay,
-      long fixedDelay, TimeUnit timeUnit) {
-    scheduler
+  public ScheduledFuture<?> scheduleWithFixedDelay(Runnable runnable,
+      long initialDelay, long fixedDelay, TimeUnit timeUnit) {
+    return scheduler
         .scheduleWithFixedDelay(runnable, initialDelay, fixedDelay, timeUnit);
   }
 
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2f9ce31..c20a6a4 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -319,15 +319,6 @@
       defined with postfix (ns,ms,s,m,h,d)</description>
   </property>
   <property>
-    <name>hdds.command.status.report.interval</name>
-    <value>60000ms</value>
-    <tag>OZONE, CONTAINER, MANAGEMENT</tag>
-    <description>Time interval of the datanode to send status of command
-      execution. Each datanode periodically the execution status of commands
-      received from SCM to SCM. Unit could be defined with postfix
-      (ns,ms,s,m,h,d)</description>
-  </property>
-  <property>
     <name>hdds.pipeline.report.interval</name>
     <value>60000ms</value>
     <tag>OZONE, PIPELINE, MANAGEMENT</tag>
@@ -1300,7 +1291,7 @@
 
   <property>
     <name>hdds.scm.safemode.pipeline-availability.check</name>
-    <value>false</value>
+    <value>true</value>
     <tag>HDDS,SCM,OPERATION</tag>
     <description>
       Boolean value to enable pipeline availability check during SCM safe mode.
@@ -1386,6 +1377,25 @@
   </property>
 
   <property>
+    <name>hdds.scm.safemode.pipeline.creation</name>
+    <value>true</value>
+    <tag>HDDS,SCM,OPERATION</tag>
+    <description>
+      Boolean value to enable background pipeline creation in SCM safe mode.
+    </description>
+  </property>
+
+  <property>
+    <name>hdds.scm.safemode.min.pipeline</name>
+    <value>1</value>
+    <tag>HDDS,SCM,OPERATION</tag>
+    <description>
+      Minimum RATIS pipeline number to exit SCM safe mode. Considered only when
+      "hdds.scm.safemode.pipeline.creation" is True.
+    </description>
+  </property>
+
+  <property>
     <name>hdds.lock.max.concurrency</name>
     <value>100</value>
     <tag>HDDS</tag>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 5424b6b..e7fda00 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -39,8 +39,12 @@ import org.apache.hadoop.ozone.container.common.report.ReportManager;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CloseContainerCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .ClosePipelineCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CommandDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .CreatePipelineCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .DeleteBlocksCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .DeleteContainerCommandHandler;
@@ -132,6 +136,8 @@ public class DatanodeStateMachine implements Closeable {
         .addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
         .addHandler(new DeleteContainerCommandHandler(
             dnConf.getContainerDeleteThreads()))
+        .addHandler(new ClosePipelineCommandHandler())
+        .addHandler(new CreatePipelineCommandHandler())
         .setConnectionManager(connectionManager)
         .setContainer(container)
         .setContext(context)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index ef06c14..3c4e24a 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Handler for close container command received from SCM.
@@ -48,7 +49,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
   private static final Logger LOG =
       LoggerFactory.getLogger(CloseContainerCommandHandler.class);
 
-  private int invocationCount;
+  private AtomicLong invocationCount = new AtomicLong(0);
   private long totalTime;
 
   /**
@@ -69,7 +70,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
   public void handle(SCMCommand command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
     LOG.debug("Processing Close Container command.");
-    invocationCount++;
+    invocationCount.incrementAndGet();
     final long startTime = Time.monotonicNow();
     final DatanodeDetails datanodeDetails = context.getParent()
         .getDatanodeDetails();
@@ -162,7 +163,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
    */
   @Override
   public int getInvocationCount() {
-    return invocationCount;
+    return (int)invocationCount.get();
   }
 
   /**
@@ -172,8 +173,8 @@ public class CloseContainerCommandHandler implements CommandHandler {
    */
   @Override
   public long getAverageRunTime() {
-    if (invocationCount > 0) {
-      return totalTime / invocationCount;
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
     }
     return 0;
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
new file mode 100644
index 0000000..b1c6090
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.transport.server
+    .XceiverServerSpi;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Handler for close pipeline command received from SCM.
+ */
+public class ClosePipelineCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ClosePipelineCommandHandler.class);
+
+  private AtomicLong invocationCount = new AtomicLong(0);
+  private long totalTime;
+
+  /**
+   * Constructs a closePipelineCommand handler.
+   */
+  public ClosePipelineCommandHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command           - SCM Command
+   * @param ozoneContainer    - Ozone Container.
+   * @param context           - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+      StateContext context, SCMConnectionManager connectionManager) {
+    invocationCount.incrementAndGet();
+    final long startTime = Time.monotonicNow();
+    final DatanodeDetails dn = context.getParent().getDatanodeDetails();
+    final ClosePipelineCommandProto closeCommand =
+        ((ClosePipelineCommand)command).getProto();
+    final HddsProtos.PipelineID pipelineID = closeCommand.getPipelineID();
+
+    try {
+      XceiverServerSpi server = ozoneContainer.getWriteChannel();
+      server.removeGroup(pipelineID);
+      LOG.info("Close Pipeline #{} command on datanode #{}.", pipelineID,
+          dn.getUuidString());
+    } catch (IOException e) {
+      LOG.error("Can't close pipeline #{}", pipelineID, e);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.closePipelineCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return (int)invocationCount.get();
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
+    }
+    return 0;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
index 70ed9ca..3a6566f 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
@@ -68,7 +68,7 @@ public interface CommandHandler {
   default void updateCommandStatus(StateContext context, SCMCommand command,
       Consumer<CommandStatus> cmdStatusUpdater, Logger log) {
     if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) {
-      log.debug("{} with Id:{} not found.", command.getType(),
+      log.warn("{} with Id:{} not found.", command.getType(),
           command.getId());
     }
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
new file mode 100644
index 0000000..3a60d7e
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.common.transport.server
+    .XceiverServerSpi;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+/**
+ * Handler for create pipeline command received from SCM.
+ */
+public class CreatePipelineCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CreatePipelineCommandHandler.class);
+
+  private AtomicLong invocationCount = new AtomicLong(0);
+  private long totalTime;
+
+  /**
+   * Constructs a createPipelineCommand handler.
+   */
+  public CreatePipelineCommandHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command           - SCM Command
+   * @param ozoneContainer    - Ozone Container.
+   * @param context           - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+      StateContext context, SCMConnectionManager connectionManager) {
+    invocationCount.incrementAndGet();
+    final long startTime = Time.monotonicNow();
+    final DatanodeDetails dn = context.getParent()
+        .getDatanodeDetails();
+    final CreatePipelineCommandProto createCommand =
+        ((CreatePipelineCommand)command).getProto();
+    final HddsProtos.PipelineID pipelineID = createCommand.getPipelineID();
+    Collection<DatanodeDetails> peers =
+        createCommand.getDatanodeList().stream()
+            .map(DatanodeDetails::getFromProtoBuf)
+            .collect(Collectors.toList());
+
+    try {
+      XceiverServerSpi server = ozoneContainer.getWriteChannel();
+      server.addGroup(pipelineID, peers);
+      LOG.info("Create Pipeline {} {} #{} command succeed on datanode {}.",
+          createCommand.getType(), createCommand.getFactor(), pipelineID,
+          dn.getUuidString());
+      // Trigger heartbeat report
+      context.addReport(context.getParent().getContainer().getPipelineReport());
+      context.getParent().triggerHeartbeat();
+    } catch (NotLeaderException e) {
+      LOG.debug("Follower cannot create pipeline #{}.", pipelineID);
+    } catch (IOException e) {
+      LOG.error("Can't create pipeline {} {} #{}", createCommand.getType(),
+          createCommand.getFactor(), pipelineID, e);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.createPipelineCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return (int)invocationCount.get();
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
+    }
+    return 0;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index c50f457..a55d0d6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine.EndPointStates;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
@@ -309,6 +311,26 @@ public class HeartbeatEndpointTask
         }
         this.context.addCommand(deleteContainerCommand);
         break;
+      case createPipelineCommand:
+        CreatePipelineCommand createPipelineCommand =
+            CreatePipelineCommand.getFromProtobuf(
+                commandResponseProto.getCreatePipelineCommandProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM create pipeline request {}",
+              createPipelineCommand.getPipelineID());
+        }
+        this.context.addCommand(createPipelineCommand);
+        break;
+      case closePipelineCommand:
+        ClosePipelineCommand closePipelineCommand =
+            ClosePipelineCommand.getFromProtobuf(
+                commandResponseProto.getClosePipelineCommandProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM close pipeline request {}",
+              closePipelineCommand.getPipelineID());
+        }
+        this.context.addCommand(closePipelineCommand);
+        break;
       default:
         throw new IllegalArgumentException("Unknown response : "
             + commandResponseProto.getCommandType().name());
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
index 4e0d343..01f463c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerSpi.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.common.transport.server;
 
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -25,6 +26,7 @@ import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReport;
 
 import java.io.IOException;
+import java.util.Collection;
 import java.util.List;
 
 /** A server endpoint that acts as the communication layer for Ozone
@@ -60,6 +62,22 @@ public interface XceiverServerSpi {
    */
   boolean isExist(HddsProtos.PipelineID pipelineId);
 
+
+  /**
+   * Join a new pipeline.
+   */
+  default void addGroup(HddsProtos.PipelineID pipelineId,
+      Collection<DatanodeDetails> peers) throws IOException {
+  }
+
+
+  /**
+   * Exit a pipeline.
+   */
+  default void removeGroup(HddsProtos.PipelineID pipelineId)
+      throws IOException {
+  }
+
   /**
    * Get pipeline report for the XceiverServer instance.
    * @return list of report for each pipeline.
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 1146394..a76944b 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -67,6 +67,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -620,6 +621,41 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     return pipelineIDs;
   }
 
+  @Override
+  public void addGroup(HddsProtos.PipelineID pipelineId,
+      Collection<DatanodeDetails> peers) throws IOException {
+    final PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineId);
+    final RaftGroupId groupId = RaftGroupId.valueOf(pipelineID.getId());
+    final RaftGroup group = RatisHelper.newRaftGroup(groupId, peers);
+    GroupManagementRequest request = GroupManagementRequest.newAdd(
+        clientId, server.getId(), nextCallId(), group);
+
+    RaftClientReply reply;
+    try {
+      reply = server.groupManagement(request);
+    } catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
+    processReply(reply);
+  }
+
+  @Override
+  public void removeGroup(HddsProtos.PipelineID pipelineId)
+      throws IOException {
+    GroupManagementRequest request = GroupManagementRequest.newRemove(
+        clientId, server.getId(), nextCallId(),
+        RaftGroupId.valueOf(PipelineID.getFromProtobuf(pipelineId).getId()),
+        true);
+
+    RaftClientReply reply;
+    try {
+      reply = server.groupManagement(request);
+    } catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
+    processReply(reply);
+  }
+
   void handleNodeSlowness(RaftGroupId groupId, RoleInfoProto roleInfoProto) {
     handlePipelineFailure(groupId, roleInfoProto);
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java
new file mode 100644
index 0000000..1f75bc3
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+
+/**
+ * Asks datanode to close a pipeline.
+ */
+public class ClosePipelineCommand
+    extends SCMCommand<ClosePipelineCommandProto> {
+
+  private final PipelineID pipelineID;
+
+  public ClosePipelineCommand(final PipelineID pipelineID) {
+    super();
+    this.pipelineID = pipelineID;
+  }
+
+  public ClosePipelineCommand(long cmdId, final PipelineID pipelineID) {
+    super(cmdId);
+    this.pipelineID = pipelineID;
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.closePipelineCommand;
+  }
+
+  @Override
+  public ClosePipelineCommandProto getProto() {
+    ClosePipelineCommandProto.Builder builder =
+        ClosePipelineCommandProto.newBuilder();
+    builder.setCmdId(getId());
+    builder.setPipelineID(pipelineID.getProtobuf());
+    return builder.build();
+  }
+
+  public static ClosePipelineCommand getFromProtobuf(
+      ClosePipelineCommandProto createPipelineProto) {
+    Preconditions.checkNotNull(createPipelineProto);
+    return new ClosePipelineCommand(createPipelineProto.getCmdId(),
+        PipelineID.getFromProtobuf(createPipelineProto.getPipelineID()));
+  }
+
+  public PipelineID getPipelineID() {
+    return pipelineID;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
new file mode 100644
index 0000000..9e22cbc
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Asks datanode to create a pipeline.
+ */
+public class CreatePipelineCommand
+    extends SCMCommand<CreatePipelineCommandProto> {
+
+  private final PipelineID pipelineID;
+  private final ReplicationFactor factor;
+  private final ReplicationType type;
+  private final List<DatanodeDetails> nodelist;
+
+  public CreatePipelineCommand(final PipelineID pipelineID,
+      final ReplicationType type, final ReplicationFactor factor,
+      final List<DatanodeDetails> datanodeList) {
+    super();
+    this.pipelineID = pipelineID;
+    this.factor = factor;
+    this.type = type;
+    this.nodelist = datanodeList;
+  }
+
+  public CreatePipelineCommand(long cmdId, final PipelineID pipelineID,
+      final ReplicationType type, final ReplicationFactor factor,
+      final List<DatanodeDetails> datanodeList) {
+    super(cmdId);
+    this.pipelineID = pipelineID;
+    this.factor = factor;
+    this.type = type;
+    this.nodelist = datanodeList;
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.createPipelineCommand;
+  }
+
+  @Override
+  public CreatePipelineCommandProto getProto() {
+    return CreatePipelineCommandProto.newBuilder()
+        .setCmdId(getId())
+        .setPipelineID(pipelineID.getProtobuf())
+        .setFactor(factor)
+        .setType(type)
+        .addAllDatanode(nodelist.stream()
+            .map(DatanodeDetails::getProtoBufMessage)
+            .collect(Collectors.toList()))
+        .build();
+  }
+
+  public static CreatePipelineCommand getFromProtobuf(
+      CreatePipelineCommandProto createPipelineProto) {
+    Preconditions.checkNotNull(createPipelineProto);
+    return new CreatePipelineCommand(createPipelineProto.getCmdId(),
+        PipelineID.getFromProtobuf(createPipelineProto.getPipelineID()),
+        createPipelineProto.getType(), createPipelineProto.getFactor(),
+        createPipelineProto.getDatanodeList().stream()
+            .map(DatanodeDetails::getFromProtoBuf)
+            .collect(Collectors.toList()));
+  }
+
+  public PipelineID getPipelineID() {
+    return pipelineID;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 45a1db6..8b272c8 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -283,6 +283,8 @@ message SCMCommandProto {
     closeContainerCommand = 3;
     deleteContainerCommand = 4;
     replicateContainerCommand = 5;
+    createPipelineCommand = 6;
+    closePipelineCommand = 7;
   }
   // TODO: once we start using protoc 3.x, refactor this message using "oneof"
   required Type commandType = 1;
@@ -291,6 +293,8 @@ message SCMCommandProto {
   optional CloseContainerCommandProto closeContainerCommandProto = 4;
   optional DeleteContainerCommandProto deleteContainerCommandProto = 5;
   optional ReplicateContainerCommandProto replicateContainerCommandProto = 6;
+  optional CreatePipelineCommandProto createPipelineCommandProto = 7;
+  optional ClosePipelineCommandProto closePipelineCommandProto = 8;
 }
 
 /**
@@ -360,6 +364,25 @@ message ReplicateContainerCommandProto {
 }
 
 /**
+This command asks the datanode to create a pipeline.
+*/
+message CreatePipelineCommandProto {
+  required PipelineID pipelineID = 1;
+  required ReplicationType type = 2;
+  required ReplicationFactor factor = 3;
+  repeated DatanodeDetailsProto datanode = 4;
+  required int64 cmdId = 5;
+}
+
+/**
+This command asks the datanode to close a pipeline.
+*/
+message ClosePipelineCommandProto {
+  required PipelineID pipelineID = 1;
+  required int64 cmdId = 2;
+}
+
+/**
  * Protocol used from a datanode to StorageContainerManager.
  *
  * Please see the request and response messages for details of the RPC calls.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 845bdf1..b7a7525 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import javax.management.ObjectName;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -196,6 +197,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
           // TODO: #CLUTIL Remove creation logic when all replication types and
           // factors are handled by pipeline creator
           pipeline = pipelineManager.createPipeline(type, factor);
+          // wait until pipeline is ready
+          pipelineManager.waitPipelineReady(pipeline.getId(), 0);
         } catch (IOException e) {
           LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
                   "get pipelines call once.", type, factor, e);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 7dde8d7..cefc185 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -253,6 +253,7 @@ public class ContainerStateManager {
       // TODO: #CLUTIL remove creation logic when all replication types and
       // factors are handled by pipeline creator job.
       pipeline = pipelineManager.createPipeline(type, replicationFactor);
+      pipelineManager.waitPipelineReady(pipeline.getId(), 0);
     } catch (IOException e) {
       final List<Pipeline> pipelines = pipelineManager
           .getPipelines(type, replicationFactor, Pipeline.PipelineState.OPEN);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 43d396e..6de05fd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.events;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -97,15 +98,14 @@ public final class SCMEvents {
           new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report");
 
   /**
-   * PipelineReport processed by pipeline report handler. This event is
+   * Open pipeline event sent by PipelineReportHandler. This event is
    * received by HealthyPipelineSafeModeRule.
    */
-  public static final TypedEvent<PipelineReportFromDatanode>
-      PROCESSED_PIPELINE_REPORT = new TypedEvent<>(
-          PipelineReportFromDatanode.class, "Processed_Pipeline_Report");
+  public static final TypedEvent<Pipeline>
+      OPEN_PIPELINE = new TypedEvent<>(Pipeline.class, "Open_Pipeline");
 
   /**
-   * PipelineActions are sent by Datanode. This event is received by
+   * PipelineActions are sent by Datanode to close a pipeline. It's received by
    * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated.
    */
   public static final TypedEvent<PipelineActionsFromDatanode>
@@ -113,7 +113,7 @@ public final class SCMEvents {
       "Pipeline_Actions");
 
   /**
-   * A Command status report will be sent by datanodes. This repoort is received
+   * A Command status report will be sent by datanodes. This report is received
    * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated.
    */
   public static final TypedEvent<CommandStatusReportFromDatanode>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 6873566..4065c2f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -26,6 +26,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -41,6 +42,7 @@ class BackgroundPipelineCreator {
   private final AtomicBoolean isPipelineCreatorRunning;
   private final PipelineManager pipelineManager;
   private final Configuration conf;
+  private ScheduledFuture<?> periodicTask;
 
   BackgroundPipelineCreator(PipelineManager pipelineManager,
       Scheduler scheduler, Configuration conf) {
@@ -57,13 +59,16 @@ class BackgroundPipelineCreator {
   /**
    * Schedules a fixed interval job to create pipelines.
    */
-  void startFixedIntervalPipelineCreator() {
+  synchronized void startFixedIntervalPipelineCreator() {
+    if (periodicTask != null) {
+      return;
+    }
     long intervalInMillis = conf
         .getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
             ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
             TimeUnit.MILLISECONDS);
     // TODO: #CLUTIL We can start the job asap
-    scheduler.scheduleWithFixedDelay(() -> {
+    periodicTask = scheduler.scheduleWithFixedDelay(() -> {
       if (!shouldSchedulePipelineCreator()) {
         return;
       }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 77e037a..86ad5ee 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -39,12 +40,13 @@ public final class PipelineFactory {
   private Map<ReplicationType, PipelineProvider> providers;
 
   PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
-      Configuration conf, GrpcTlsConfig tlsConfig) {
+      Configuration conf, EventPublisher eventPublisher) {
     providers = new HashMap<>();
     providers.put(ReplicationType.STAND_ALONE,
         new SimplePipelineProvider(nodeManager));
     providers.put(ReplicationType.RATIS,
-        new RatisPipelineProvider(nodeManager, stateManager, conf, tlsConfig));
+        new RatisPipelineProvider(nodeManager, stateManager, conf,
+            eventPublisher));
   }
 
   @VisibleForTesting
@@ -63,6 +65,11 @@ public final class PipelineFactory {
     return providers.get(type).create(factor, nodes);
   }
 
+  public void close(ReplicationType type, Pipeline pipeline)
+      throws IOException {
+    providers.get(type).close(pipeline);
+  }
+
   public void shutdown() {
     providers.values().forEach(provider -> provider.shutdown());
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 9ba5f31..779008f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.ratis.grpc.GrpcTlsConfig;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -51,6 +50,9 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
       ReplicationFactor factor);
 
   List<Pipeline> getPipelines(ReplicationType type,
+      Pipeline.PipelineState state);
+
+  List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor, Pipeline.PipelineState state);
 
   List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
@@ -95,5 +97,14 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
    */
   void deactivatePipeline(PipelineID pipelineID) throws IOException;
 
-  GrpcTlsConfig getGrpcTlsConfig();
+  /**
+   * Wait a pipeline to be OPEN.
+   *
+   * @param pipelineID ID of the pipeline to wait for.
+   * @param timeout    wait timeout(millisecond), if 0, use default timeout
+   * @throws IOException in case of any Exception, such as timeout
+   */
+  default void waitPipelineReady(PipelineID pipelineID, long timeout)
+      throws IOException {
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index a0ce216..c00ff78 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -33,5 +33,7 @@ public interface PipelineProvider {
 
   Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes);
 
+  void close(Pipeline pipeline) throws IOException;
+
   void shutdown();
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
index b8cb7b4..a7e2bf1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -19,18 +19,23 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import java.io.IOException;
-import java.util.Objects;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineReport;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import org.apache.hadoop.hdds.scm.server
+    .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,10 +55,8 @@ public class PipelineReportHandler implements
   private final boolean pipelineAvailabilityCheck;
 
   public PipelineReportHandler(SCMSafeModeManager scmSafeModeManager,
-      PipelineManager pipelineManager,
-      Configuration conf) {
+      PipelineManager pipelineManager, Configuration conf) {
     Preconditions.checkNotNull(pipelineManager);
-    Objects.requireNonNull(scmSafeModeManager);
     this.scmSafeModeManager = scmSafeModeManager;
     this.pipelineManager = pipelineManager;
     this.conf = conf;
@@ -76,48 +79,45 @@ public class PipelineReportHandler implements
     }
     for (PipelineReport report : pipelineReport.getPipelineReportList()) {
       try {
-        processPipelineReport(report, dn);
+        processPipelineReport(report, dn, publisher);
       } catch (IOException e) {
         LOGGER.error("Could not process pipeline report={} from dn={} {}",
             report, dn, e);
       }
     }
-    if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
-      publisher.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          pipelineReportFromDatanode);
-    }
   }
 
-  private void processPipelineReport(PipelineReport report, DatanodeDetails dn)
-      throws IOException {
+  private void processPipelineReport(PipelineReport report, DatanodeDetails dn,
+      EventPublisher publisher) throws IOException {
     PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID());
     Pipeline pipeline;
     try {
       pipeline = pipelineManager.getPipeline(pipelineID);
     } catch (PipelineNotFoundException e) {
-      RatisPipelineUtils.destroyPipeline(dn, pipelineID, conf,
-          pipelineManager.getGrpcTlsConfig());
+      final ClosePipelineCommand closeCommand =
+          new ClosePipelineCommand(pipelineID);
+      final CommandForDatanode datanodeCommand =
+          new CommandForDatanode<>(dn.getUuid(), closeCommand);
+      publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
       return;
     }
 
     pipeline.reportDatanode(dn);
-    if (report.getIsLeader()) {
+    // ONE replica pipeline doesn't have leader flag
+    if (report.getIsLeader() ||
+        pipeline.getFactor() == HddsProtos.ReplicationFactor.ONE) {
       pipeline.setLeaderId(dn.getUuid());
     }
-    if ((pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED)
-        && pipeline.isHealthy()) {
-      pipelineManager.openPipeline(pipelineID);
-    }
 
     if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
-
-
-      if (report.getIsLeader()) {
-        // Pipeline reported as the leader
-        pipeline.setLeaderId(dn.getUuid());
+      LOGGER.info("Pipeline {} {} reported by {}", pipeline.getFactor(),
+          pipeline.getId(), dn);
+      if (pipeline.isHealthy()) {
         pipelineManager.openPipeline(pipelineID);
+        if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
+          publisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
+        }
       }
     }
-    pipeline.reportDatanode(dn);
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index 2410b54..180d0bf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -129,9 +129,9 @@ class PipelineStateManager {
       throw new IOException("Closed pipeline can not be opened");
     }
     if (pipeline.getPipelineState() == PipelineState.ALLOCATED) {
-      pipeline = pipelineStateMap.updatePipelineState(
-          pipelineId, PipelineState.OPEN);
       LOG.info("Pipeline {} moved to OPEN state", pipeline.toString());
+      pipeline = pipelineStateMap
+          .updatePipelineState(pipelineId, PipelineState.OPEN);
     }
     return pipeline;
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 94443dd..a2ee50a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -24,37 +24,28 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .ContainerPlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
-import org.apache.hadoop.io.MultipleIOException;
-import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftClientReply;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.function.CheckedBiConsumer;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -69,6 +60,7 @@ public class RatisPipelineProvider implements PipelineProvider {
   private final NodeManager nodeManager;
   private final PipelineStateManager stateManager;
   private final Configuration conf;
+  private final EventPublisher eventPublisher;
 
   // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
   private final int parallelismForPool = 3;
@@ -83,15 +75,14 @@ public class RatisPipelineProvider implements PipelineProvider {
 
   private final ForkJoinPool forkJoinPool = new ForkJoinPool(
       parallelismForPool, factory, null, false);
-  private final GrpcTlsConfig tlsConfig;
 
   RatisPipelineProvider(NodeManager nodeManager,
       PipelineStateManager stateManager, Configuration conf,
-      GrpcTlsConfig tlsConfig) {
+      EventPublisher eventPublisher) {
     this.nodeManager = nodeManager;
     this.stateManager = stateManager;
     this.conf = conf;
-    this.tlsConfig = tlsConfig;
+    this.eventPublisher = eventPublisher;
   }
 
 
@@ -153,8 +144,27 @@ public class RatisPipelineProvider implements PipelineProvider {
       throw new InsufficientDatanodesException(e);
     }
 
-    Pipeline pipeline = create(factor, dns);
-    initializePipeline(pipeline);
+    Pipeline pipeline = Pipeline.newBuilder()
+        .setId(PipelineID.randomId())
+        .setState(PipelineState.ALLOCATED)
+        .setType(ReplicationType.RATIS)
+        .setFactor(factor)
+        .setNodes(dns)
+        .build();
+
+    // Send command to datanodes to create pipeline
+    final CreatePipelineCommand createCommand =
+        new CreatePipelineCommand(pipeline.getId(), pipeline.getType(),
+            factor, dns);
+
+    dns.stream().forEach(node -> {
+      final CommandForDatanode datanodeCommand =
+          new CommandForDatanode<>(node.getUuid(), createCommand);
+      LOG.info("Send pipeline:{} create command to datanode {}",
+          pipeline.getId(), datanodeCommand.getDatanodeId());
+      eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+    });
+
     return pipeline;
   }
 
@@ -181,69 +191,22 @@ public class RatisPipelineProvider implements PipelineProvider {
     }
   }
 
-  protected void initializePipeline(Pipeline pipeline) throws IOException {
-    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
-    }
-    callRatisRpc(pipeline.getNodes(),
-        (raftClient, peer) -> {
-          RaftClientReply reply = raftClient.groupAdd(group, peer.getId());
-          if (reply == null || !reply.isSuccess()) {
-            String msg = "Pipeline initialization failed for pipeline:"
-                + pipeline.getId() + " node:" + peer.getId();
-            LOG.error(msg);
-            throw new IOException(msg);
-          }
-        });
-  }
-
-  private void callRatisRpc(List<DatanodeDetails> datanodes,
-      CheckedBiConsumer< RaftClient, RaftPeer, IOException> rpc)
-      throws IOException {
-    if (datanodes.isEmpty()) {
-      return;
-    }
-
-    final String rpcType = conf
-        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
-            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
-    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
-    final List< IOException > exceptions =
-        Collections.synchronizedList(new ArrayList<>());
-    final int maxOutstandingRequests =
-        HddsClientUtils.getMaxOutstandingRequests(conf);
-    final TimeDuration requestTimeout =
-        RatisHelper.getClientRequestTimeout(conf);
-    try {
-      forkJoinPool.submit(() -> {
-        datanodes.parallelStream().forEach(d -> {
-          final RaftPeer p = RatisHelper.toRaftPeer(d);
-          try (RaftClient client = RatisHelper
-              .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
-                  retryPolicy, maxOutstandingRequests, tlsConfig,
-                  requestTimeout)) {
-            rpc.accept(client, p);
-          } catch (IOException ioe) {
-            String errMsg =
-                "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid();
-            LOG.error(errMsg, ioe);
-            exceptions.add(new IOException(errMsg, ioe));
-          }
-        });
-      }).get();
-    } catch (ExecutionException | RejectedExecutionException ex) {
-      LOG.error(ex.getClass().getName() + " exception occurred during " +
-          "createPipeline", ex);
-      throw new IOException(ex.getClass().getName() + " exception occurred " +
-          "during createPipeline", ex);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupt exception occurred during " +
-          "createPipeline", ex);
-    }
-    if (!exceptions.isEmpty()) {
-      throw MultipleIOException.createIOException(exceptions);
-    }
+  /**
+   * Removes pipeline from SCM. Sends command to destroy pipeline on all
+   * the datanodes.
+   *
+   * @param pipeline        - Pipeline to be destroyed
+   * @throws IOException
+   */
+  public void close(Pipeline pipeline) {
+    final ClosePipelineCommand closeCommand =
+        new ClosePipelineCommand(pipeline.getId());
+    pipeline.getNodes().stream().forEach(node -> {
+      final CommandForDatanode datanodeCommand =
+          new CommandForDatanode<>(node.getUuid(), closeCommand);
+      LOG.info("Send pipeline:{} close command to datanode {}",
+          pipeline.getId(), datanodeCommand.getDatanodeId());
+      eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+    });
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
deleted file mode 100644
index 497e717..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ /dev/null
@@ -1,103 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utility class for Ratis pipelines. Contains methods to create and destroy
- * ratis pipelines.
- */
-public final class RatisPipelineUtils {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(RatisPipelineUtils.class);
-
-  private RatisPipelineUtils() {
-  }
-  /**
-   * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
-   * the datanodes.
-   *
-   * @param pipeline        - Pipeline to be destroyed
-   * @param ozoneConf       - Ozone configuration
-   * @param grpcTlsConfig
-   * @throws IOException
-   */
-  public static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf,
-      GrpcTlsConfig grpcTlsConfig) {
-    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
-    }
-    for (DatanodeDetails dn : pipeline.getNodes()) {
-      try {
-        destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
-      } catch (IOException e) {
-        LOG.warn("Pipeline destroy failed for pipeline={} dn={}",
-            pipeline.getId(), dn);
-      }
-    }
-  }
-
-  /**
-   * Sends ratis command to destroy pipeline on the given datanode.
-   *
-   * @param dn         - Datanode on which pipeline needs to be destroyed
-   * @param pipelineID - ID of pipeline to be destroyed
-   * @param ozoneConf  - Ozone configuration
-   * @param grpcTlsConfig - grpc tls configuration
-   * @throws IOException
-   */
-  static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
-      Configuration ozoneConf, GrpcTlsConfig grpcTlsConfig) throws IOException {
-    final String rpcType = ozoneConf
-        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
-            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
-    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
-    final RaftPeer p = RatisHelper.toRaftPeer(dn);
-    final int maxOutstandingRequests =
-        HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
-    final TimeDuration requestTimeout =
-        RatisHelper.getClientRequestTimeout(ozoneConf);
-    try(RaftClient client = RatisHelper
-        .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
-            retryPolicy, maxOutstandingRequests, grpcTlsConfig,
-            requestTimeout)) {
-      client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()),
-          true, p.getId());
-    }
-  }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 0964f6d..00a4429 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -37,7 +38,7 @@ import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
 import org.apache.hadoop.hdds.utils.MetadataStore;
 import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
 import org.apache.hadoop.hdds.utils.Scheduler;
-import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,18 +82,18 @@ public class SCMPipelineManager implements PipelineManager {
   private final NodeManager nodeManager;
   private final SCMPipelineMetrics metrics;
   private final Configuration conf;
+  private long pipelineWaitDefaultTimeout;
   // Pipeline Manager MXBean
   private ObjectName pmInfoBean;
-  private GrpcTlsConfig grpcTlsConfig;
 
   public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
-      EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig)
+      EventPublisher eventPublisher)
       throws IOException {
     this.lock = new ReentrantReadWriteLock();
     this.conf = conf;
     this.stateManager = new PipelineStateManager(conf);
     this.pipelineFactory = new PipelineFactory(nodeManager, stateManager,
-        conf, grpcTlsConfig);
+        conf, eventPublisher);
     // TODO: See if thread priority needs to be set for these threads
     scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
     this.backgroundPipelineCreator =
@@ -113,8 +114,11 @@ public class SCMPipelineManager implements PipelineManager {
     this.metrics = SCMPipelineMetrics.create();
     this.pmInfoBean = MBeans.register("SCMPipelineManager",
         "SCMPipelineManagerInfo", this);
+    this.pipelineWaitDefaultTimeout = conf.getTimeDuration(
+        HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
+        HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
     initializePipelineState();
-    this.grpcTlsConfig = grpcTlsConfig;
   }
 
   public PipelineStateManager getStateManager() {
@@ -148,8 +152,8 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   @Override
-  public synchronized Pipeline createPipeline(
-      ReplicationType type, ReplicationFactor factor) throws IOException {
+  public synchronized Pipeline createPipeline(ReplicationType type,
+      ReplicationFactor factor) throws IOException {
     lock.writeLock().lock();
     try {
       Pipeline pipeline = pipelineFactory.create(type, factor);
@@ -157,8 +161,11 @@ public class SCMPipelineManager implements PipelineManager {
           pipeline.getProtobufMessage().toByteArray());
       stateManager.addPipeline(pipeline);
       nodeManager.addPipeline(pipeline);
-      metrics.incNumPipelineCreated();
-      metrics.createPerPipelineMetrics(pipeline);
+      metrics.incNumPipelineAllocated();
+      if (pipeline.isOpen()) {
+        metrics.incNumPipelineCreated();
+        metrics.createPerPipelineMetrics(pipeline);
+      }
       return pipeline;
     } catch (InsufficientDatanodesException idEx) {
       throw idEx;
@@ -225,6 +232,16 @@ public class SCMPipelineManager implements PipelineManager {
     }
   }
 
+  public List<Pipeline> getPipelines(ReplicationType type,
+      Pipeline.PipelineState state) {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipelines(type, state);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
   @Override
   public List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor, Pipeline.PipelineState state) {
@@ -293,6 +310,7 @@ public class SCMPipelineManager implements PipelineManager {
     lock.writeLock().lock();
     try {
       Pipeline pipeline = stateManager.openPipeline(pipelineId);
+      metrics.incNumPipelineCreated();
       metrics.createPerPipelineMetrics(pipeline);
     } finally {
       lock.writeLock().unlock();
@@ -380,6 +398,45 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   /**
+   * Wait a pipeline to be OPEN.
+   *
+   * @param pipelineID ID of the pipeline to wait for.
+   * @param timeout    wait timeout, millisecond, 0 to use default value
+   * @throws IOException in case of any Exception, such as timeout
+   */
+  @Override
+  public void waitPipelineReady(PipelineID pipelineID, long timeout)
+      throws IOException {
+    long st = Time.monotonicNow();
+    if (timeout == 0) {
+      timeout = pipelineWaitDefaultTimeout;
+    }
+
+    boolean ready;
+    Pipeline pipeline;
+    do {
+      try {
+        pipeline = stateManager.getPipeline(pipelineID);
+      } catch (PipelineNotFoundException e) {
+        throw new PipelineNotFoundException(String.format(
+            "Pipeline %s cannot be found", pipelineID));
+      }
+      ready = pipeline.isOpen();
+      if (!ready) {
+        try {
+          Thread.sleep((long)100);
+        } catch (InterruptedException e) {
+        }
+      }
+    } while (!ready && Time.monotonicNow() - st < timeout);
+
+    if (!ready) {
+      throw new IOException(String.format("Pipeline %s is not ready in %d ms",
+          pipelineID, timeout));
+    }
+  }
+
+  /**
    * Moves the pipeline to CLOSED state and sends close container command for
    * all the containers in the pipeline.
    *
@@ -408,7 +465,7 @@ public class SCMPipelineManager implements PipelineManager {
    * @throws IOException
    */
   private void destroyPipeline(Pipeline pipeline) throws IOException {
-    RatisPipelineUtils.destroyPipeline(pipeline, conf, grpcTlsConfig);
+    pipelineFactory.close(pipeline.getType(), pipeline);
     // remove the pipeline from the pipeline manager
     removePipeline(pipeline.getId());
     triggerPipelineCreation();
@@ -441,11 +498,6 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   @Override
-  public GrpcTlsConfig getGrpcTlsConfig() {
-    return grpcTlsConfig;
-  }
-
-  @Override
   public void close() throws IOException {
     if (scheduler != null) {
       scheduler.close();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index b6a1445..40a6f29 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -47,6 +47,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
 
   private MetricsRegistry registry;
 
+  private @Metric MutableCounterLong numPipelineAllocated;
   private @Metric MutableCounterLong numPipelineCreated;
   private @Metric MutableCounterLong numPipelineCreationFailed;
   private @Metric MutableCounterLong numPipelineDestroyed;
@@ -84,6 +85,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
   @SuppressWarnings("SuspiciousMethodCalls")
   public void getMetrics(MetricsCollector collector, boolean all) {
     MetricsRecordBuilder recordBuilder = collector.addRecord(SOURCE_NAME);
+    numPipelineAllocated.snapshot(recordBuilder, true);
     numPipelineCreated.snapshot(recordBuilder, true);
     numPipelineCreationFailed.snapshot(recordBuilder, true);
     numPipelineDestroyed.snapshot(recordBuilder, true);
@@ -118,6 +120,14 @@ public final class SCMPipelineMetrics implements MetricsSource {
   }
 
   /**
+   * Increments number of pipeline allocation count, including succeeded
+   * and failed.
+   */
+  void incNumPipelineAllocated() {
+    numPipelineAllocated.incr();
+  }
+
+  /**
    * Increments number of successful pipeline creation count.
    */
   void incNumPipelineCreated() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index ab98dfa..00cb7ae 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -74,6 +74,11 @@ public class SimplePipelineProvider implements PipelineProvider {
   }
 
   @Override
+  public void close(Pipeline pipeline) throws IOException {
+
+  }
+
+  @Override
   public void shutdown() {
     // Do nothing.
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 2f9a66f..9b19acf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -17,27 +17,19 @@
  */
 package org.apache.hadoop.hdds.scm.safemode;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 
 /**
  * Class defining Safe mode exit criteria for Pipelines.
@@ -47,43 +39,55 @@ import com.google.common.base.Preconditions;
  * through in a cluster.
  */
 public class HealthyPipelineSafeModeRule
-    extends SafeModeExitRule<PipelineReportFromDatanode>{
+    extends SafeModeExitRule<Pipeline>{
 
   public static final Logger LOG =
       LoggerFactory.getLogger(HealthyPipelineSafeModeRule.class);
-  private final PipelineManager pipelineManager;
   private int healthyPipelineThresholdCount;
   private int currentHealthyPipelineCount = 0;
-  private final Map<PipelineID, Boolean> processedPipelines = new HashMap<>();
   private final double healthyPipelinesPercent;
 
   HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
       PipelineManager pipelineManager,
       SCMSafeModeManager manager, Configuration configuration) {
     super(manager, ruleName, eventQueue);
-    this.pipelineManager = pipelineManager;
     healthyPipelinesPercent =
         configuration.getDouble(HddsConfigKeys.
                 HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
             HddsConfigKeys.
                 HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT);
 
+    int minHealthyPipelines = 0;
+
+    boolean createPipelineInSafemode = configuration.getBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
+
+    if (createPipelineInSafemode) {
+      minHealthyPipelines =
+          configuration.getInt(HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE,
+              HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT);
+    }
+
     Preconditions.checkArgument(
         (healthyPipelinesPercent >= 0.0 && healthyPipelinesPercent <= 1.0),
         HddsConfigKeys.
             HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT
             + " value should be >= 0.0 and <= 1.0");
 
-    // As we want to wait for 3 node pipelines
-    int pipelineCount =
+    // We want to wait for RATIS THREE factor write pipelines
+    int pipelineCount = pipelineManager.getPipelines(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE,
+        Pipeline.PipelineState.OPEN).size() +
         pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE).size();
+            HddsProtos.ReplicationFactor.THREE,
+            Pipeline.PipelineState.ALLOCATED).size();
 
     // This value will be zero when pipeline count is 0.
     // On a fresh installed cluster, there will be zero pipelines in the SCM
     // pipeline DB.
-    healthyPipelineThresholdCount =
-        (int) Math.ceil(healthyPipelinesPercent * pipelineCount);
+    healthyPipelineThresholdCount = Math.max(minHealthyPipelines,
+        (int) Math.ceil(healthyPipelinesPercent * pipelineCount));
 
     LOG.info(" Total pipeline count is {}, healthy pipeline " +
         "threshold count is {}", pipelineCount, healthyPipelineThresholdCount);
@@ -99,8 +103,8 @@ public class HealthyPipelineSafeModeRule
   }
 
   @Override
-  protected TypedEvent<PipelineReportFromDatanode> getEventType() {
-    return SCMEvents.PROCESSED_PIPELINE_REPORT;
+  protected TypedEvent<Pipeline> getEventType() {
+    return SCMEvents.OPEN_PIPELINE;
   }
 
   @Override
@@ -112,38 +116,18 @@ public class HealthyPipelineSafeModeRule
   }
 
   @Override
-  protected void process(PipelineReportFromDatanode
-      pipelineReportFromDatanode) {
+  protected void process(Pipeline pipeline) {
 
     // When SCM is in safe mode for long time, already registered
-    // datanode can send pipeline report again, then pipeline handler fires
-    // processed report event, we should not consider this pipeline report
-    // from datanode again during threshold calculation.
-    Preconditions.checkNotNull(pipelineReportFromDatanode);
-
-    PipelineReportsProto pipelineReport =
-        pipelineReportFromDatanode.getReport();
-
-    for (PipelineReport report : pipelineReport.getPipelineReportList()) {
-      PipelineID pipelineID = PipelineID.getFromProtobuf(
-          report.getPipelineID());
-      Pipeline pipeline;
-      try {
-        pipeline = pipelineManager.getPipeline(pipelineID);
-      } catch (PipelineNotFoundException e) {
-        continue;
-      }
-
-      if (!processedPipelines.containsKey(pipelineID)) {
-        if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
-            report.getIsLeader()) {
-          // If the pipeline gets reported with a leader we mark it as healthy
-          currentHealthyPipelineCount++;
-          getSafeModeMetrics().incCurrentHealthyPipelinesCount();
-          processedPipelines.put(pipelineID, Boolean.TRUE);
-        }
-      }
+    // datanode can send pipeline report again, or SCMPipelineManager will
+    // create new pipelines.
+    Preconditions.checkNotNull(pipeline);
+    if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
+        pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) {
+      getSafeModeMetrics().incCurrentHealthyPipelinesCount();
+      currentHealthyPipelineCount++;
     }
+
     if (scmInSafeMode()) {
       SCMSafeModeManager.getLogger().info(
           "SCM in safe mode. Healthy pipelines reported count is {}, " +
@@ -154,7 +138,6 @@ public class HealthyPipelineSafeModeRule
 
   @Override
   protected void cleanup() {
-    processedPipelines.clear();
   }
 
   @VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
index 841d8ff..0783d02 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
@@ -22,17 +22,10 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.
-    PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.slf4j.Logger;
@@ -40,6 +33,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.HashSet;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * This rule covers whether we have at least one datanode is reported for each
@@ -47,14 +41,14 @@ import java.util.Set;
  * replica available for read when we exit safe mode.
  */
 public class OneReplicaPipelineSafeModeRule extends
-    SafeModeExitRule<PipelineReportFromDatanode> {
+    SafeModeExitRule<Pipeline> {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(OneReplicaPipelineSafeModeRule.class);
 
   private int thresholdCount;
   private Set<PipelineID> reportedPipelineIDSet = new HashSet<>();
-  private final PipelineManager pipelineManager;
+  private Set<PipelineID> oldPipelineIDSet;
   private int currentReportedPipelineCount = 0;
 
 
@@ -62,7 +56,6 @@ public class OneReplicaPipelineSafeModeRule extends
       PipelineManager pipelineManager,
       SCMSafeModeManager safeModeManager, Configuration configuration) {
     super(safeModeManager, ruleName, eventQueue);
-    this.pipelineManager = pipelineManager;
 
     double percent =
         configuration.getDouble(
@@ -75,24 +68,25 @@ public class OneReplicaPipelineSafeModeRule extends
             HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT  +
             " value should be >= 0.0 and <= 1.0");
 
-    int totalPipelineCount =
-        pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE).size();
+    oldPipelineIDSet = pipelineManager.getPipelines(
+        HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.THREE)
+        .stream().map(p -> p.getId()).collect(Collectors.toSet());
+    int totalPipelineCount = oldPipelineIDSet.size();
 
     thresholdCount = (int) Math.ceil(percent * totalPipelineCount);
 
-    LOG.info(" Total pipeline count is {}, pipeline's with atleast one " +
+    LOG.info("Total pipeline count is {}, pipeline's with at least one " +
         "datanode reported threshold count is {}", totalPipelineCount,
         thresholdCount);
 
     getSafeModeMetrics().setNumPipelinesWithAtleastOneReplicaReportedThreshold(
         thresholdCount);
-
   }
 
   @Override
-  protected TypedEvent<PipelineReportFromDatanode> getEventType() {
-    return SCMEvents.PROCESSED_PIPELINE_REPORT;
+  protected TypedEvent<Pipeline> getEventType() {
+    return SCMEvents.OPEN_PIPELINE;
   }
 
   @Override
@@ -104,40 +98,26 @@ public class OneReplicaPipelineSafeModeRule extends
   }
 
   @Override
-  protected void process(PipelineReportFromDatanode
-      pipelineReportFromDatanode) {
-    Pipeline pipeline;
-    Preconditions.checkNotNull(pipelineReportFromDatanode);
-    PipelineReportsProto pipelineReport =
-        pipelineReportFromDatanode.getReport();
-
-    for (PipelineReport report : pipelineReport.getPipelineReportList()) {
-      PipelineID pipelineID = PipelineID
-          .getFromProtobuf(report.getPipelineID());
-      try {
-        pipeline = pipelineManager.getPipeline(pipelineID);
-      } catch (PipelineNotFoundException e) {
-        continue;
-      }
-
-      if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
-          !reportedPipelineIDSet.contains(pipelineID)) {
-        reportedPipelineIDSet.add(pipelineID);
+  protected void process(Pipeline pipeline) {
+    Preconditions.checkNotNull(pipeline);
+    if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
+        pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
+        !reportedPipelineIDSet.contains(pipeline.getId())) {
+      if (oldPipelineIDSet.contains(pipeline.getId())) {
         getSafeModeMetrics()
             .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount();
+        currentReportedPipelineCount++;
+        reportedPipelineIDSet.add(pipeline.getId());
       }
     }
 
-    currentReportedPipelineCount = reportedPipelineIDSet.size();
-
     if (scmInSafeMode()) {
       SCMSafeModeManager.getLogger().info(
-          "SCM in safe mode. Pipelines with atleast one datanode reported " +
-              "count is {}, required atleast one datanode reported per " +
+          "SCM in safe mode. Pipelines with at least one datanode reported " +
+              "count is {}, required at least one datanode reported per " +
               "pipeline count is {}",
           currentReportedPipelineCount, thresholdCount);
     }
-
   }
 
   @Override
@@ -154,5 +134,4 @@ public class OneReplicaPipelineSafeModeRule extends
   public int getCurrentReportedPipelineCount() {
     return currentReportedPipelineCount;
   }
-
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
index a22d162..1e83bc4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
@@ -59,17 +59,17 @@ import org.slf4j.LoggerFactory;
  * number of datanode registered is met or not.
  *
  * 3. HealthyPipelineSafeModeRule:
- * Once the pipelineReportHandler processes the
+ * Once the PipelineReportHandler processes the
  * {@link SCMEvents#PIPELINE_REPORT}, it fires
- * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this
  * event. This rule processes this report, and check if pipeline is healthy
  * and increments current healthy pipeline count. Then validate it cutoff
  * threshold for healthy pipeline is met or not.
  *
  * 4. OneReplicaPipelineSafeModeRule:
- * Once the pipelineReportHandler processes the
+ * Once the PipelineReportHandler processes the
  * {@link SCMEvents#PIPELINE_REPORT}, it fires
- * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this
  * event. This rule processes this report, and add the reported pipeline to
  * reported pipeline set. Then validate it cutoff threshold for one replica
  * per pipeline is met or not.
@@ -135,6 +135,13 @@ public class SCMSafeModeManager {
             oneReplicaPipelineSafeModeRule);
       }
       emitSafeModeStatus();
+      boolean createPipelineInSafemode = conf.getBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
+
+      if (createPipelineInSafemode) {
+        pipelineManager.startPipelineCreator();
+      }
     } else {
       this.safeModeMetrics = null;
       exitSafeMode(eventQueue);
@@ -166,6 +173,7 @@ public class SCMSafeModeManager {
 
     if (exitRules.get(ruleName) != null) {
       validatedRules.add(ruleName);
+      LOG.info("{} rule is successfully validated", ruleName);
     } else {
       // This should never happen
       LOG.error("No Such Exit rule {}", ruleName);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
index 44d1c94..2fbe893 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
@@ -128,7 +128,8 @@ public class SafeModeHandler implements EventHandler<SafeModeStatus> {
     List<Pipeline> pipelineList = scmPipelineManager.getPipelines();
     pipelineList.forEach((pipeline) -> {
       try {
-        if (!pipeline.isHealthy()) {
+        if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
+            pipeline.isAllocationTimeout()) {
           scmPipelineManager.finalizeAndDestroyPipeline(pipeline, false);
         }
       } catch (IOException ex) {
@@ -141,6 +142,4 @@ public class SafeModeHandler implements EventHandler<SafeModeStatus> {
   public boolean getSafeModeStatus() {
     return isInSafeMode.get();
   }
-
-
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 9f6077b..3dbb4cb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -164,6 +164,7 @@ public final class SCMDatanodeHeartbeatDispatcher {
       }
 
       if (heartbeat.getCommandStatusReportsCount() != 0) {
+        LOG.debug("Dispatching Command Status Report.");
         for (CommandStatusReportsProto commandStatusReport : heartbeat
             .getCommandStatusReportsList()) {
           eventPublisher.fireEvent(
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 530c0a6..901bc2c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -61,6 +61,8 @@ import org.apache.hadoop.ozone.audit.Auditor;
 import org.apache.hadoop.ozone.audit.SCMAction;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@@ -79,6 +81,12 @@ import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.deleteContainerCommand;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.replicateContainerCommand;
 import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.reregisterCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
+    .createPipelineCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type
+    .closePipelineCommand;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DATANODE_ADDRESS_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_DEFAULT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_HANDLER_COUNT_KEY;
@@ -329,6 +337,18 @@ public class SCMDatanodeProtocolServer implements
           .setReplicateContainerCommandProto(
               ((ReplicateContainerCommand)cmd).getProto())
           .build();
+    case createPipelineCommand:
+      return builder
+          .setCommandType(createPipelineCommand)
+          .setCreatePipelineCommandProto(
+              ((CreatePipelineCommand)cmd).getProto())
+          .build();
+    case closePipelineCommand:
+      return builder
+          .setCommandType(closePipelineCommand)
+          .setClosePipelineCommandProto(
+              ((ClosePipelineCommand)cmd).getProto())
+          .build();
     default:
       throw new IllegalArgumentException("Scm command " +
           cmd.getType().toString() + " is not implemented");
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 21127f4..16ea094 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -400,8 +400,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       pipelineManager = configurator.getPipelineManager();
     } else {
       pipelineManager =
-          new SCMPipelineManager(conf, scmNodeManager, eventQueue,
-              grpcTlsConfig);
+          new SCMPipelineManager(conf, scmNodeManager, eventQueue);
     }
 
     if (configurator.getContainerManager() != null) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 1cb8376..baeb6dc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server
     .SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
@@ -369,6 +371,16 @@ public final class TestUtils {
     return new PipelineReportFromDatanode(dn, reportBuilder.build());
   }
 
+  public static void openAllRatisPipelines(PipelineManager pipelineManager)
+      throws IOException {
+    // Pipeline is created by background thread
+    List<Pipeline> pipelines =
+        pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS);
+    // Trigger the processed pipeline report event
+    for (Pipeline pipeline : pipelines) {
+      pipelineManager.openPipeline(pipeline.getId());
+    }
+  }
 
   public static PipelineActionsFromDatanode getPipelineActionFromDatanode(
       DatanodeDetails dn, PipelineID... pipelineIDs) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index a012d64..aa190f4 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -24,11 +24,14 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
@@ -45,9 +48,13 @@ import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -94,14 +101,18 @@ public class TestBlockManager {
 
 
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, folder.newFolder().toString());
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+    conf.setTimeDuration(HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, 5,
+        TimeUnit.SECONDS);
 
     // Override the default Node Manager in SCM with this Mock Node Manager.
     nodeManager = new MockNodeManager(true, 10);
+    eventQueue = new EventQueue();
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, eventQueue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
-            pipelineManager.getStateManager(), conf);
+            pipelineManager.getStateManager(), conf, eventQueue);
     pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
     SCMConfigurator configurator = new SCMConfigurator();
@@ -112,12 +123,10 @@ public class TestBlockManager {
     // Initialize these fields so that the tests can pass.
     mapping = (SCMContainerManager) scm.getContainerManager();
     blockManager = (BlockManagerImpl) scm.getScmBlockManager();
-
-    eventQueue = new EventQueue();
-    eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS,
-        scm.getSafeModeHandler());
     eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS,
         scm.getSafeModeHandler());
+    DatanodeCommandHandler handler = new DatanodeCommandHandler();
+    eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, handler);
     CloseContainerEventHandler closeContainerHandler =
         new CloseContainerEventHandler(pipelineManager, mapping);
     eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
@@ -136,6 +145,8 @@ public class TestBlockManager {
     GenericTestUtils.waitFor(() -> {
       return !blockManager.isScmInSafeMode();
     }, 10, 1000 * 5);
+    pipelineManager.createPipeline(type, factor);
+    TestUtils.openAllRatisPipelines(pipelineManager);
     AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE,
         type, factor, OzoneConsts.OZONE, new ExcludeList());
     Assert.assertNotNull(block);
@@ -153,6 +164,7 @@ public class TestBlockManager {
       }
     } catch (IOException e) {
     }
+    TestUtils.openAllRatisPipelines(pipelineManager);
     ExcludeList excludeList = new ExcludeList();
     excludeList
         .addPipeline(pipelineManager.getPipelines(type, factor).get(0).getId());
@@ -259,6 +271,7 @@ public class TestBlockManager {
 
     pipelineManager.createPipeline(type, factor);
     pipelineManager.createPipeline(type, factor);
+    TestUtils.openAllRatisPipelines(pipelineManager);
 
     AllocatedBlock allocatedBlock = blockManager
         .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, OzoneConsts.OZONE,
@@ -305,6 +318,7 @@ public class TestBlockManager {
              .getNumber(); i++) {
       pipelineManager.createPipeline(type, factor);
     }
+    TestUtils.openAllRatisPipelines(pipelineManager);
 
     // wait till each pipeline has the configured number of containers.
     // After this each pipeline has numContainerPerOwnerInPipeline containers
@@ -359,7 +373,23 @@ public class TestBlockManager {
     Assert.assertNotNull(blockManager
         .allocateBlock(DEFAULT_BLOCK_SIZE, type, factor, OzoneConsts.OZONE,
             new ExcludeList()));
-    Assert.assertEquals(1, pipelineManager.getPipelines(type, factor).size());
   }
 
+  private class DatanodeCommandHandler implements
+      EventHandler<CommandForDatanode> {
+
+    @Override
+    public void onMessage(final CommandForDatanode command,
+                          final EventPublisher publisher) {
+      final SCMCommandProto.Type commandType = command.getCommand().getType();
+      if (commandType == SCMCommandProto.Type.createPipelineCommand) {
+        CreatePipelineCommand createCommand =
+            (CreatePipelineCommand) command.getCommand();
+        try {
+          pipelineManager.openPipeline(createCommand.getPipelineID());
+        } catch (IOException e) {
+        }
+      }
+    }
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index b022fd9..4f503e4 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
@@ -67,8 +68,9 @@ public class TestCloseContainerEventHandler {
     configuration
         .set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
     nodeManager = new MockNodeManager(true, 10);
+    eventQueue = new EventQueue();
     pipelineManager =
-        new SCMPipelineManager(configuration, nodeManager, eventQueue, null);
+        new SCMPipelineManager(configuration, nodeManager, eventQueue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), configuration);
@@ -77,10 +79,13 @@ public class TestCloseContainerEventHandler {
     containerManager = new
         SCMContainerManager(configuration, nodeManager,
         pipelineManager, new EventQueue());
-    eventQueue = new EventQueue();
+    pipelineManager.triggerPipelineCreation();
     eventQueue.addHandler(CLOSE_CONTAINER,
         new CloseContainerEventHandler(pipelineManager, containerManager));
     eventQueue.addHandler(DATANODE_COMMAND, nodeManager);
+    // Move all pipelines created by background from ALLOCATED to OPEN state
+    Thread.sleep(2000);
+    TestUtils.openAllRatisPipelines(pipelineManager);
   }
 
   @AfterClass
@@ -116,7 +121,6 @@ public class TestCloseContainerEventHandler {
 
   @Test
   public void testCloseContainerEventWithValidContainers() throws IOException {
-
     ContainerInfo container = containerManager
         .allocateContainer(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.ONE, OzoneConsts.OZONE);
@@ -134,7 +138,6 @@ public class TestCloseContainerEventHandler {
 
   @Test
   public void testCloseContainerEventWithRatis() throws IOException {
-
     GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
         .captureLogs(CloseContainerEventHandler.LOG);
     ContainerInfo container = containerManager
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
index 6436af0..fde94d7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
@@ -96,7 +96,7 @@ public class TestSCMContainerManager {
     }
     nodeManager = new MockNodeManager(true, 10);
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     containerManager = new SCMContainerManager(conf, nodeManager,
         pipelineManager, new EventQueue());
     xceiverClientManager = new XceiverClientManager(conf);
@@ -147,7 +147,7 @@ public class TestSCMContainerManager {
           containerInfo.getPipelineID()).getFirstNode()
           .getUuid());
     }
-    Assert.assertTrue(pipelineList.size() > 5);
+    Assert.assertTrue(pipelineList.size() >= 1);
   }
 
   @Test
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 3e4508d..f786060 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -108,7 +108,7 @@ public class TestContainerPlacement {
     final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
         OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
     PipelineManager pipelineManager =
-        new SCMPipelineManager(config, scmNodeManager, eventQueue, null);
+        new SCMPipelineManager(config, scmNodeManager, eventQueue);
     return new SCMContainerManager(config, scmNodeManager, pipelineManager,
         eventQueue);
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index c140119..1676af1 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -74,6 +74,7 @@ public class TestDeadNodeHandler {
   private SCMNodeManager nodeManager;
   private ContainerManager containerManager;
   private NodeReportHandler nodeReportHandler;
+  private SCMPipelineManager pipelineManager;
   private DeadNodeHandler deadNodeHandler;
   private EventPublisher publisher;
   private EventQueue eventQueue;
@@ -88,12 +89,12 @@ public class TestDeadNodeHandler {
     eventQueue = new EventQueue();
     scm = HddsTestUtils.getScm(conf);
     nodeManager = (SCMNodeManager) scm.getScmNodeManager();
-    SCMPipelineManager manager =
+    pipelineManager =
         (SCMPipelineManager)scm.getPipelineManager();
     PipelineProvider mockRatisProvider =
-        new MockRatisPipelineProvider(nodeManager, manager.getStateManager(),
-            conf);
-    manager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+        new MockRatisPipelineProvider(nodeManager,
+            pipelineManager.getStateManager(), conf);
+    pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
     containerManager = scm.getContainerManager();
     deadNodeHandler = new DeadNodeHandler(nodeManager,
@@ -149,6 +150,8 @@ public class TestDeadNodeHandler {
     nodeManager.register(TestUtils.randomDatanodeDetails(),
         TestUtils.createNodeReport(storageOne), null);
 
+    TestUtils.openAllRatisPipelines(pipelineManager);
+
     ContainerInfo container1 =
         TestUtils.allocateContainer(containerManager);
     ContainerInfo container2 =
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index db76d66..f4eb797 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -121,6 +121,7 @@ public class TestSCMNodeManager {
         testDir.getAbsolutePath());
     conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
         TimeUnit.MILLISECONDS);
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     return conf;
   }
 
@@ -1035,9 +1036,11 @@ public class TestSCMNodeManager {
       eq.processAll(1000L);
       List<SCMCommand> command =
           nodemanager.processHeartbeat(datanodeDetails);
-      Assert.assertEquals(1, command.size());
-      Assert
-          .assertEquals(command.get(0).getClass(), CloseContainerCommand.class);
+      // With dh registered, SCM will send create pipeline command to dn
+      Assert.assertTrue(command.size() >= 1);
+      Assert.assertTrue(command.get(0).getClass().equals(
+          CloseContainerCommand.class) ||
+          command.get(1).getClass().equals(CloseContainerCommand.class));
     } catch (IOException e) {
       e.printStackTrace();
       throw  e;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 342ee5b..25b0adc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -22,6 +22,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
 
 import java.io.IOException;
 import java.util.List;
@@ -34,7 +36,13 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
   public MockRatisPipelineProvider(NodeManager nodeManager,
                             PipelineStateManager stateManager,
                             Configuration conf) {
-    super(nodeManager, stateManager, conf, null);
+    super(nodeManager, stateManager, conf, new EventQueue());
+  }
+
+  public MockRatisPipelineProvider(NodeManager nodeManager,
+      PipelineStateManager stateManager, Configuration conf,
+      EventPublisher eventPublisher) {
+    super(nodeManager, stateManager, conf, eventPublisher);
   }
 
   protected void initializePipeline(Pipeline pipeline) throws IOException {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 6f0425d..065b08b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -61,11 +61,13 @@ public class TestRatisPipelineProvider {
   private void createPipelineAndAssertions(
           HddsProtos.ReplicationFactor factor) throws IOException {
     Pipeline pipeline = provider.create(factor);
-    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
+        Pipeline.PipelineState.ALLOCATED);
     stateManager.addPipeline(pipeline);
 
     Pipeline pipeline1 = provider.create(factor);
-    assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE);
+    assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE,
+        Pipeline.PipelineState.ALLOCATED);
     // New pipeline should not overlap with the previous created pipeline
     assertTrue(
         intersection(pipeline.getNodes(), pipeline1.getNodes())
@@ -77,12 +79,14 @@ public class TestRatisPipelineProvider {
   public void testCreatePipelineWithFactor() throws IOException {
     HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
     Pipeline pipeline = provider.create(factor);
-    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
+        Pipeline.PipelineState.ALLOCATED);
     stateManager.addPipeline(pipeline);
 
     factor = HddsProtos.ReplicationFactor.ONE;
     Pipeline pipeline1 = provider.create(factor);
-    assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE);
+    assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE,
+        Pipeline.PipelineState.ALLOCATED);
     stateManager.addPipeline(pipeline1);
     // New pipeline should overlap with the previous created pipeline,
     // and one datanode should overlap between the two types.
@@ -113,11 +117,13 @@ public class TestRatisPipelineProvider {
     HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
     Pipeline pipeline =
         provider.create(factor, createListOfNodes(factor.getNumber()));
-    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
+        Pipeline.PipelineState.OPEN);
 
     factor = HddsProtos.ReplicationFactor.ONE;
     pipeline = provider.create(factor, createListOfNodes(factor.getNumber()));
-    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
+        Pipeline.PipelineState.OPEN);
   }
 
   @Test
@@ -141,7 +147,8 @@ public class TestRatisPipelineProvider {
 
     // only 2 healthy DNs left that are not part of any pipeline
     Pipeline pipeline = provider.create(factor);
-    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE);
+    assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
+        Pipeline.PipelineState.ALLOCATED);
 
     List<DatanodeDetails> nodes = pipeline.getNodes();
 
@@ -156,8 +163,9 @@ public class TestRatisPipelineProvider {
 
   private static void assertPipelineProperties(
       Pipeline pipeline, HddsProtos.ReplicationFactor expectedFactor,
-      HddsProtos.ReplicationType expectedReplicationType) {
-    assertEquals(Pipeline.PipelineState.OPEN, pipeline.getPipelineState());
+      HddsProtos.ReplicationType expectedReplicationType,
+      Pipeline.PipelineState expectedState) {
+    assertEquals(expectedState, pipeline.getPipelineState());
     assertEquals(expectedReplicationType, pipeline.getType());
     assertEquals(expectedFactor, pipeline.getFactor());
     assertEquals(expectedFactor.getNumber(), pipeline.getNodes().size());
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index f6d9b0e..6ea1bfe 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -22,10 +22,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -34,7 +30,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -68,10 +63,11 @@ public class TestHealthyPipelineSafeModeRule {
       // enable pipeline check
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+      config.setBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue, null);
+          nodeManager, eventQueue);
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
               pipelineManager.getStateManager(), config);
@@ -88,10 +84,8 @@ public class TestHealthyPipelineSafeModeRule {
     } finally {
       FileUtil.fullyDelete(new File(storageDir));
     }
-
   }
 
-
   @Test
   public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception {
 
@@ -113,10 +107,11 @@ public class TestHealthyPipelineSafeModeRule {
       // enable pipeline check
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+      config.setBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue, null);
+          nodeManager, eventQueue);
 
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
@@ -162,7 +157,6 @@ public class TestHealthyPipelineSafeModeRule {
     } finally {
       FileUtil.fullyDelete(new File(storageDir));
     }
-
   }
 
 
@@ -188,10 +182,11 @@ public class TestHealthyPipelineSafeModeRule {
       // enable pipeline check
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+      config.setBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, eventQueue, null);
+          nodeManager, eventQueue);
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
               pipelineManager.getStateManager(), config);
@@ -217,7 +212,7 @@ public class TestHealthyPipelineSafeModeRule {
           scmSafeModeManager.getHealthyPipelineSafeModeRule();
 
 
-      // No datanodes have sent pipelinereport from datanode
+      // No pipeline event have sent to SCMSafemodeManager
       Assert.assertFalse(healthyPipelineSafeModeRule.validate());
 
 
@@ -225,12 +220,12 @@ public class TestHealthyPipelineSafeModeRule {
           GenericTestUtils.LogCapturer.captureLogs(LoggerFactory.getLogger(
               SCMSafeModeManager.class));
 
-      // fire event with pipeline report with ratis type and factor 1
+      // fire event with pipeline create status with ratis type and factor 1
       // pipeline, validate() should return false
       firePipelineEvent(pipeline1, eventQueue);
 
       GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains(
-          "reported count is 0"),
+          "reported count is 1"),
           1000, 5000);
       Assert.assertFalse(healthyPipelineSafeModeRule.validate());
 
@@ -246,20 +241,7 @@ public class TestHealthyPipelineSafeModeRule {
 
   }
 
-
   private void firePipelineEvent(Pipeline pipeline, EventQueue eventQueue) {
-    PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
-        .newBuilder();
-
-    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-        .setPipelineID(pipeline.getId().getProtobuf())
-        .setIsLeader(Boolean.TRUE));
-
-    // Here no need to fire event from 3 nodes, as already pipeline is in
-    // open state, but doing it.
-    eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-        new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-            pipeline.getNodes().get(0), reportBuilder.build()));
+    eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
   }
-
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index 7a09977..0fa5eae 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@ -20,10 +20,6 @@ package org.apache.hadoop.hdds.scm.safemode;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -32,7 +28,6 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -63,6 +58,8 @@ public class TestOneReplicaPipelineSafeModeRule {
         HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
     ozoneConfiguration.set(HddsConfigKeys.OZONE_METADATA_DIRS,
         folder.newFolder().toString());
+    ozoneConfiguration.setBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
     List<ContainerInfo> containers = new ArrayList<>();
     containers.addAll(HddsTestUtils.getContainerInfo(1));
@@ -71,7 +68,7 @@ public class TestOneReplicaPipelineSafeModeRule {
     eventQueue = new EventQueue();
     pipelineManager =
         new SCMPipelineManager(ozoneConfiguration, mockNodeManager,
-            eventQueue, null);
+            eventQueue);
 
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(mockNodeManager,
@@ -123,7 +120,6 @@ public class TestOneReplicaPipelineSafeModeRule {
     firePipelineEvent(pipelines.get(pipelineFactorThreeCount - 1));
 
     GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
-
   }
 
 
@@ -170,11 +166,8 @@ public class TestOneReplicaPipelineSafeModeRule {
     firePipelineEvent(pipelines.get(pipelineCountThree - 1));
 
     GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
-
   }
 
-
-
   private void createPipelines(int count,
       HddsProtos.ReplicationFactor factor) throws Exception {
     for (int i = 0; i < count; i++) {
@@ -184,27 +177,6 @@ public class TestOneReplicaPipelineSafeModeRule {
   }
 
   private void firePipelineEvent(Pipeline pipeline) {
-    PipelineReportsProto.Builder reportBuilder =
-        PipelineReportsProto.newBuilder();
-
-    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-        .setPipelineID(pipeline.getId().getProtobuf())
-        .setIsLeader(Boolean.TRUE));
-
-    if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) {
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(0), reportBuilder.build()));
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(1), reportBuilder.build()));
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(2), reportBuilder.build()));
-    } else {
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(0), reportBuilder.build()));
-    }
+    eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
   }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 1e608b3..b5839bc 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -32,8 +32,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -43,7 +41,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -73,6 +70,8 @@ public class TestSCMSafeModeManager {
   public static void setUp() {
     queue = new EventQueue();
     config = new OzoneConfiguration();
+    config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        false);
   }
 
   @Test
@@ -177,7 +176,7 @@ public class TestSCMSafeModeManager {
         HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT, healthyPercent);
     conf.setDouble(HddsConfigKeys.
         HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT, oneReplicaPercent);
-
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     return conf;
   }
 
@@ -199,7 +198,7 @@ public class TestSCMSafeModeManager {
           0.9);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue, null);
+          mockNodeManager, queue);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForHealthyPipelinePercent");
@@ -217,7 +216,7 @@ public class TestSCMSafeModeManager {
           200);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue, null);
+          mockNodeManager, queue);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForOneReplicaPipelinePercent");
@@ -234,7 +233,7 @@ public class TestSCMSafeModeManager {
       conf.setDouble(HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT, -1.0);
       MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
       PipelineManager pipelineManager = new SCMPipelineManager(conf,
-          mockNodeManager, queue, null);
+          mockNodeManager, queue);
       scmSafeModeManager = new SCMSafeModeManager(
           conf, containers, pipelineManager, queue);
       fail("testFailWithIncorrectValueForSafeModePercent");
@@ -258,7 +257,7 @@ public class TestSCMSafeModeManager {
 
     MockNodeManager mockNodeManager = new MockNodeManager(true, nodeCount);
     SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
-        mockNodeManager, queue, null);
+        mockNodeManager, queue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(mockNodeManager,
             pipelineManager.getStateManager(), config);
@@ -302,12 +301,12 @@ public class TestSCMSafeModeManager {
     // we shall a get an event when datanode is registered. In that case,
     // validate will return true, and add this to validatedRules.
     if (Math.max(healthyPipelinePercent, oneReplicaThresholdCount) == 0) {
-      firePipelineEvent(pipelines.get(0));
+      firePipelineEvent(pipelineManager, pipelines.get(0));
     }
 
     for (int i = 0; i < Math.max(healthyPipelineThresholdCount,
-        oneReplicaThresholdCount); i++) {
-      firePipelineEvent(pipelines.get(i));
+        Math.min(oneReplicaThresholdCount, pipelines.size())); i++) {
+      firePipelineEvent(pipelineManager, pipelines.get(i));
 
       if (i < healthyPipelineThresholdCount) {
         checkHealthy(i + 1);
@@ -352,16 +351,11 @@ public class TestSCMSafeModeManager {
         1000,  5000);
   }
 
-  private void firePipelineEvent(Pipeline pipeline) throws Exception {
-    PipelineReportsProto.Builder reportBuilder =
-        PipelineReportsProto.newBuilder();
-
-    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-        .setPipelineID(pipeline.getId().getProtobuf())
-        .setIsLeader(Boolean.TRUE));
-    queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-        new PipelineReportFromDatanode(pipeline.getNodes().get(0),
-            reportBuilder.build()));
+  private void firePipelineEvent(SCMPipelineManager pipelineManager,
+      Pipeline pipeline) throws Exception {
+    pipelineManager.openPipeline(pipeline.getId());
+    queue.fireEvent(SCMEvents.OPEN_PIPELINE,
+        pipelineManager.getPipeline(pipeline.getId()));
   }
 
 
@@ -480,7 +474,7 @@ public class TestSCMSafeModeManager {
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
-          nodeManager, queue, null);
+          nodeManager, queue);
 
       PipelineProvider mockRatisProvider =
           new MockRatisPipelineProvider(nodeManager,
@@ -491,11 +485,6 @@ public class TestSCMSafeModeManager {
       Pipeline pipeline = pipelineManager.createPipeline(
           HddsProtos.ReplicationType.RATIS,
           HddsProtos.ReplicationFactor.THREE);
-      PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
-          .newBuilder();
-      reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-          .setPipelineID(pipeline.getId().getProtobuf())
-          .setIsLeader(Boolean.TRUE));
 
       scmSafeModeManager = new SCMSafeModeManager(
           config, containers, pipelineManager, queue);
@@ -504,10 +493,9 @@ public class TestSCMSafeModeManager {
           HddsTestUtils.createNodeRegistrationContainerReport(containers));
       assertTrue(scmSafeModeManager.getInSafeMode());
 
-      // Trigger the processed pipeline report event
-      queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new PipelineReportFromDatanode(pipeline.getNodes().get(0),
-              reportBuilder.build()));
+
+
+      firePipelineEvent(pipelineManager, pipeline);
 
       GenericTestUtils.waitFor(() -> {
         return !scmSafeModeManager.getInSafeMode();
diff --git a/hadoop-ozone/dist/src/main/compose/testlib.sh b/hadoop-ozone/dist/src/main/compose/testlib.sh
index b20dca8..684f2f5 100755
--- a/hadoop-ozone/dist/src/main/compose/testlib.sh
+++ b/hadoop-ozone/dist/src/main/compose/testlib.sh
@@ -80,6 +80,40 @@ wait_for_datanodes(){
    return 1
 }
 
+## @description wait until safemode exit (or 30 seconds)
+## @param the docker-compose file
+wait_for_safemode_exit(){
+  local compose_file=$1
+
+  #Reset the timer
+  SECONDS=0
+
+  #Don't give it up until 30 seconds
+  while [[ $SECONDS -lt 90 ]]; do
+
+     #This line checks the safemode status in scm
+     local command="ozone scmcli safemode status"
+     if [[ "${SECURITY_ENABLED}" == 'true' ]]; then
+         status=`docker-compose -f "${compose_file}" exec -T scm bash -c "kinit -k HTTP/scm@EXAMPLE.COM -t /etc/security/keytabs/HTTP.keytab && $command'"`
+     else
+         status=`docker-compose -f "${compose_file}" exec -T scm bash -c "$command"`
+     fi
+
+     echo $status
+     if [[ "$status" ]]; then
+       if [[ ${status} == "SCM is out of safe mode." ]]; then
+         #Safemode exits. Let's return from the function.
+         echo "Safe mode is off"
+         return
+       fi
+     fi
+
+     sleep 2
+   done
+   echo "WARNING! Safemode is still on. Please check the docker-compose files"
+   return 1
+}
+
 ## @description  Starts a docker-compose based test environment
 ## @param number of datanodes to start and wait for (default: 3)
 start_docker_env(){
@@ -90,6 +124,7 @@ start_docker_env(){
   docker-compose -f "$COMPOSE_FILE" --no-ansi down
   docker-compose -f "$COMPOSE_FILE" --no-ansi up -d --scale datanode="${datanode_count}" \
     && wait_for_datanodes "$COMPOSE_FILE" "${datanode_count}" \
+    && wait_for_safemode_exit "$COMPOSE_FILE" \
     && sleep 10
 
   if [[ $? -gt 0 ]]; then
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
index fe612a0..b4f8c37 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
@@ -73,7 +73,7 @@ public class TestContainerStateManagerIntegration {
     numContainerPerOwnerInPipeline =
         conf.getInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
             ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT_DEFAULT);
-    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
     cluster.waitForClusterToBeReady();
     cluster.waitTobeOutOfSafeMode();
     xceiverClientManager = new XceiverClientManager(conf);
@@ -165,7 +165,9 @@ public class TestContainerStateManagerIntegration {
       }
     }
 
-    cluster.restartStorageContainerManager(true);
+    // Restart SCM will not trigger container report to satisfy the safe mode
+    // exit rule.
+    cluster.restartStorageContainerManager(false);
 
     List<ContainerInfo> result = cluster.getStorageContainerManager()
         .getContainerManager().listContainer(null, 100);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/metrics/TestSCMContainerManagerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/metrics/TestSCMContainerManagerMetrics.java
index f2c31d1..26c8c01 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/metrics/TestSCMContainerManagerMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/metrics/TestSCMContainerManagerMetrics.java
@@ -40,6 +40,8 @@ import java.io.IOException;
 import java.util.HashMap;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.junit.Assert.fail;
@@ -56,6 +58,7 @@ public class TestSCMContainerManagerMetrics {
   public void setup() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.set(HDDS_CONTAINER_REPORT_INTERVAL, "3000s");
+    conf.setBoolean(HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
     cluster.waitForClusterToBeReady();
     scm = cluster.getStorageContainerManager();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index c583559..21fa7bd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -81,7 +81,7 @@ public class TestPipelineClose {
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
     conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000,
         TimeUnit.MILLISECONDS);
-    pipelineDestroyTimeoutInMillis = 5000;
+    pipelineDestroyTimeoutInMillis = 1000;
     conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
         pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS);
     cluster.waitForClusterToBeReady();
@@ -169,7 +169,7 @@ public class TestPipelineClose {
         new PipelineActionHandler(pipelineManager, conf);
     pipelineActionHandler
         .onMessage(pipelineActionsFromDatanode, new EventQueue());
-    Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 1.2));
+    Thread.sleep(5000);
     OzoneContainer ozoneContainer =
         cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
             .getContainer();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
new file mode 100644
index 0000000..8b8c64f
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Test for RatisPipelineProvider.
+ */
+public class TestRatisPipelineProvider {
+
+  private NodeManager nodeManager;
+  private PipelineProvider provider;
+  private PipelineStateManager stateManager;
+
+  @Before
+  public void init() throws Exception {
+    nodeManager = new MockNodeManager(true, 10);
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+    stateManager = new PipelineStateManager(conf);
+    provider = new MockRatisPipelineProvider(nodeManager,
+        stateManager, new OzoneConfiguration());
+  }
+
+  private void createPipelineAndAssertions(
+          HddsProtos.ReplicationFactor factor) throws IOException {
+    Pipeline pipeline = provider.create(factor);
+    stateManager.addPipeline(pipeline);
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+        pipeline.getPipelineState());
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+    Pipeline pipeline1 = provider.create(factor);
+    stateManager.addPipeline(pipeline1);
+    // New pipeline should not overlap with the previous created pipeline
+    Assert.assertTrue(
+        CollectionUtils.intersection(pipeline.getNodes(), pipeline1.getNodes())
+            .isEmpty());
+    Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline1.getFactor(), factor);
+    Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+        pipeline1.getPipelineState());
+    Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
+  }
+
+  @Test
+  public void testCreatePipelineWithFactor() throws IOException {
+    HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+    Pipeline pipeline = provider.create(factor);
+    stateManager.addPipeline(pipeline);
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+        pipeline.getPipelineState());
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+
+    factor = HddsProtos.ReplicationFactor.ONE;
+    Pipeline pipeline1 = provider.create(factor);
+    stateManager.addPipeline(pipeline1);
+    // New pipeline should overlap with the previous created pipeline,
+    // and one datanode should overlap between the two types.
+    Assert.assertEquals(
+        CollectionUtils.intersection(pipeline.getNodes(),
+            pipeline1.getNodes()).size(), 1);
+    Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline1.getFactor(), factor);
+    Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+        pipeline1.getPipelineState());
+    Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
+  }
+
+  @Test
+  public void testCreatePipelineWithFactorThree() throws IOException {
+    createPipelineAndAssertions(HddsProtos.ReplicationFactor.THREE);
+  }
+
+  @Test
+  public void testCreatePipelineWithFactorOne() throws IOException {
+    createPipelineAndAssertions(HddsProtos.ReplicationFactor.ONE);
+  }
+
+  private List<DatanodeDetails> createListOfNodes(int nodeCount) {
+    List<DatanodeDetails> nodes = new ArrayList<>();
+    for (int i = 0; i < nodeCount; i++) {
+      nodes.add(TestUtils.randomDatanodeDetails());
+    }
+    return nodes;
+  }
+
+  @Test
+  public void testCreatePipelineWithNodes() {
+    HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+    Pipeline pipeline =
+        provider.create(factor, createListOfNodes(factor.getNumber()));
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(
+        pipeline.getPipelineState(), Pipeline.PipelineState.OPEN);
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+
+    factor = HddsProtos.ReplicationFactor.ONE;
+    pipeline = provider.create(factor, createListOfNodes(factor.getNumber()));
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(pipeline.getPipelineState(),
+        Pipeline.PipelineState.OPEN);
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+  }
+
+  @Test
+  public void testCreatePipelinesDnExclude() throws IOException {
+
+    // We need 9 Healthy DNs in MockNodeManager.
+    NodeManager mockNodeManager = new MockNodeManager(true, 12);
+    PipelineStateManager stateManagerMock =
+        new PipelineStateManager(new OzoneConfiguration());
+    PipelineProvider providerMock = new MockRatisPipelineProvider(
+        mockNodeManager, stateManagerMock, new OzoneConfiguration());
+
+    // Use up first 3 DNs for an open pipeline.
+    List<DatanodeDetails> openPiplineDns = mockNodeManager.getAllNodes()
+        .subList(0, 3);
+    HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+
+    Pipeline openPipeline = Pipeline.newBuilder()
+        .setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(factor)
+        .setNodes(openPiplineDns)
+        .setState(Pipeline.PipelineState.OPEN)
+        .setId(PipelineID.randomId())
+        .build();
+
+    stateManagerMock.addPipeline(openPipeline);
+
+    // Use up next 3 DNs also for an open pipeline.
+    List<DatanodeDetails> moreOpenPiplineDns = mockNodeManager.getAllNodes()
+        .subList(3, 6);
+    Pipeline anotherOpenPipeline = Pipeline.newBuilder()
+        .setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(factor)
+        .setNodes(moreOpenPiplineDns)
+        .setState(Pipeline.PipelineState.OPEN)
+        .setId(PipelineID.randomId())
+        .build();
+    stateManagerMock.addPipeline(anotherOpenPipeline);
+
+    // Use up next 3 DNs also for a closed pipeline.
+    List<DatanodeDetails> closedPiplineDns = mockNodeManager.getAllNodes()
+        .subList(6, 9);
+    Pipeline anotherClosedPipeline = Pipeline.newBuilder()
+        .setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(factor)
+        .setNodes(closedPiplineDns)
+        .setState(Pipeline.PipelineState.CLOSED)
+        .setId(PipelineID.randomId())
+        .build();
+    stateManagerMock.addPipeline(anotherClosedPipeline);
+
+    Pipeline pipeline = providerMock.create(factor);
+    Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+    Assert.assertEquals(pipeline.getFactor(), factor);
+    Assert.assertEquals(Pipeline.PipelineState.ALLOCATED,
+        pipeline.getPipelineState());
+    Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+    List<DatanodeDetails> pipelineNodes = pipeline.getNodes();
+
+    // Pipline nodes cannot be from open pipelines.
+    Assert.assertTrue(
+        pipelineNodes.parallelStream().filter(dn ->
+        (openPiplineDns.contains(dn) || moreOpenPiplineDns.contains(dn)))
+        .count() == 0);
+
+    // Since we have only 9 Healthy DNs, at least 1 pipeline node should have
+    // been from the closed pipeline DN list.
+    Assert.assertTrue(pipelineNodes.parallelStream().filter(
+        closedPiplineDns::contains).count() > 0);
+  }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 105d2e2..9e2cfa9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -62,6 +62,7 @@ public class TestSCMPipelineManager {
     testDir = GenericTestUtils
         .getTestDir(TestSCMPipelineManager.class.getSimpleName());
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     boolean folderExisted = testDir.exists() || testDir.mkdirs();
     if (!folderExisted) {
       throw new IOException("Unable to create test directory path");
@@ -77,7 +78,7 @@ public class TestSCMPipelineManager {
   @Test
   public void testPipelineReload() throws IOException {
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -94,7 +95,7 @@ public class TestSCMPipelineManager {
 
     // new pipeline manager should be able to load the pipelines from the db
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -117,7 +118,7 @@ public class TestSCMPipelineManager {
   @Test
   public void testRemovePipeline() throws IOException {
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -135,7 +136,71 @@ public class TestSCMPipelineManager {
 
     // new pipeline manager should not be able to load removed pipelines
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
+    try {
+      pipelineManager.getPipeline(pipeline.getId());
+      Assert.fail("Pipeline should not have been retrieved");
+    } catch (IOException e) {
+      Assert.assertTrue(e.getMessage().contains("not found"));
+    }
+
+    // clean up
+    pipelineManager.close();
+  }
+
+  @Test
+  public void testPipelineReport() throws IOException {
+    EventQueue eventQueue = new EventQueue();
+    SCMPipelineManager pipelineManager =
+        new SCMPipelineManager(conf, nodeManager, eventQueue);
+    PipelineProvider mockRatisProvider =
+        new MockRatisPipelineProvider(nodeManager,
+            pipelineManager.getStateManager(), conf);
+    pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+        mockRatisProvider);
+
+    SCMSafeModeManager scmSafeModeManager =
+        new SCMSafeModeManager(conf, new ArrayList<>(), pipelineManager,
+            eventQueue);
+
+    // create a pipeline in allocated state with no dns yet reported
+    Pipeline pipeline = pipelineManager
+        .createPipeline(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE);
+
+    Assert
+        .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
+    Assert
+        .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isOpen());
+
+    // pipeline is not healthy until all dns report
+    PipelineReportHandler pipelineReportHandler =
+        new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
+    List<DatanodeDetails> nodes = pipeline.getNodes();
+    Assert.assertFalse(
+        pipelineManager.getPipeline(pipeline.getId()).isHealthy());
+    // get pipeline report from each dn in the pipeline
+    nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
+        pipelineReportHandler, false, eventQueue));
+    sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
+        pipelineReportHandler, true, eventQueue);
+
+    // pipeline is healthy when all dns report
+    Assert
+        .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
+    // pipeline should now move to open state
+    Assert
+        .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
+
+    // close the pipeline
+    pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+
+    // pipeline report for destroyed pipeline should be ignored
+    nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
+        pipelineReportHandler, false, eventQueue));
+    sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
+        pipelineReportHandler, true, eventQueue);
+
     try {
       pipelineManager.getPipeline(pipeline.getId());
       Assert.fail("Pipeline should not have been retrieved");
@@ -152,7 +217,7 @@ public class TestSCMPipelineManager {
     MockNodeManager nodeManagerMock = new MockNodeManager(true,
         20);
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManagerMock, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManagerMock, new EventQueue());
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManagerMock,
             pipelineManager.getStateManager(), conf);
@@ -161,9 +226,9 @@ public class TestSCMPipelineManager {
 
     MetricsRecordBuilder metrics = getMetrics(
         SCMPipelineMetrics.class.getSimpleName());
-    long numPipelineCreated = getLongCounter("NumPipelineCreated",
+    long numPipelineAllocated = getLongCounter("NumPipelineAllocated",
         metrics);
-    Assert.assertTrue(numPipelineCreated == 0);
+    Assert.assertTrue(numPipelineAllocated == 0);
 
     // 3 DNs are unhealthy.
     // Create 5 pipelines (Use up 15 Datanodes)
@@ -176,8 +241,8 @@ public class TestSCMPipelineManager {
 
     metrics = getMetrics(
         SCMPipelineMetrics.class.getSimpleName());
-    numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
-    Assert.assertTrue(numPipelineCreated == 5);
+    numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+    Assert.assertTrue(numPipelineAllocated == 5);
 
     long numPipelineCreateFailed = getLongCounter(
         "NumPipelineCreationFailed", metrics);
@@ -196,8 +261,8 @@ public class TestSCMPipelineManager {
 
     metrics = getMetrics(
         SCMPipelineMetrics.class.getSimpleName());
-    numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
-    Assert.assertTrue(numPipelineCreated == 5);
+    numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+    Assert.assertTrue(numPipelineAllocated == 5);
 
     numPipelineCreateFailed = getLongCounter(
         "NumPipelineCreationFailed", metrics);
@@ -210,7 +275,7 @@ public class TestSCMPipelineManager {
   @Test
   public void testActivateDeactivatePipeline() throws IOException {
     final SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, new EventQueue(), null);
+        new SCMPipelineManager(conf, nodeManager, new EventQueue());
     final PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -257,7 +322,7 @@ public class TestSCMPipelineManager {
   public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
     EventQueue eventQueue = new EventQueue();
     SCMPipelineManager pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue, null);
+        new SCMPipelineManager(conf, nodeManager, eventQueue);
     PipelineProvider mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
@@ -270,7 +335,7 @@ public class TestSCMPipelineManager {
     pipelineManager.close();
     // new pipeline manager loads the pipelines from the db in ALLOCATED state
     pipelineManager =
-        new SCMPipelineManager(conf, nodeManager, eventQueue, null);
+        new SCMPipelineManager(conf, nodeManager, eventQueue);
     mockRatisProvider =
         new MockRatisPipelineProvider(nodeManager,
             pipelineManager.getStateManager(), conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 59cef37..0102246 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -262,7 +262,7 @@ public interface MiniOzoneCluster {
     // Use relative smaller number of handlers for testing
     protected int numOfOmHandlers = 20;
     protected int numOfScmHandlers = 20;
-    protected int numOfDatanodes = 1;
+    protected int numOfDatanodes = 3;
     protected boolean  startDataNodes = true;
     protected CertificateClient certClient;
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 2813711..2ad6c12 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -28,8 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.safemode.HealthyPipelineSafeModeRule;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
@@ -67,7 +65,6 @@ import java.nio.file.Paths;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
@@ -98,7 +95,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
   private final List<HddsDatanodeService> hddsDatanodes;
 
   // Timeout for the cluster to be ready
-  private int waitForClusterToBeReadyTimeout = 60000; // 1 min
+  private int waitForClusterToBeReadyTimeout = 120000; // 2 min
   private CertificateClient caClient;
 
   /**
@@ -147,32 +144,17 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       throws TimeoutException, InterruptedException {
     GenericTestUtils.waitFor(() -> {
       final int healthy = scm.getNodeCount(HEALTHY);
-      boolean isReady = healthy == hddsDatanodes.size();
-      boolean printIsReadyMsg = true;
-      List<Pipeline> pipelines = scm.getPipelineManager().getPipelines();
-      if (!pipelines.isEmpty()) {
-        List<Pipeline> raftPipelines = pipelines.stream().filter(p ->
-            p.getType() == HddsProtos.ReplicationType.RATIS).collect(
-                Collectors.toList());
-        if (!raftPipelines.isEmpty()) {
-          List<Pipeline> notOpenPipelines = raftPipelines.stream().filter(p ->
-              p.getPipelineState() != Pipeline.PipelineState.OPEN &&
-                  p.getPipelineState() != Pipeline.PipelineState.CLOSED)
-              .collect(Collectors.toList());
-          if (notOpenPipelines.size() > 0) {
-            LOG.info("Waiting for {} number of pipelines out of {}, to report "
-                + "a leader.", notOpenPipelines.size(), raftPipelines.size());
-            isReady = false;
-            printIsReadyMsg = false;
-          }
-        }
-      }
-      if (printIsReadyMsg) {
-        LOG.info("{}. Got {} of {} DN Heartbeats.",
-            isReady ? "Cluster is ready" : "Waiting for cluster to be ready",
-            healthy, hddsDatanodes.size());
-      }
-      return isReady;
+      final boolean isNodeReady = healthy == hddsDatanodes.size();
+      final boolean exitSafeMode = !scm.isInSafeMode();
+
+      LOG.info("{}. Got {} of {} DN Heartbeats.",
+          isNodeReady? "Nodes are ready" : "Waiting for nodes to be ready",
+          healthy, hddsDatanodes.size());
+      LOG.info(exitSafeMode? "Cluster exits safe mode" :
+              "Waiting for cluster to exit safe mode",
+          healthy, hddsDatanodes.size());
+
+      return isNodeReady && exitSafeMode;
     }, 1000, waitForClusterToBeReadyTimeout);
   }
 
@@ -660,7 +642,6 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       if (hbInterval.isPresent()) {
         conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
             hbInterval.get(), TimeUnit.MILLISECONDS);
-
       } else {
         conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
             DEFAULT_HB_INTERVAL_MS,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
index cd975cf..eadb520 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerOperations.java
@@ -49,7 +49,7 @@ public class TestContainerOperations {
     ozoneConf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
         SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
     ozoneConf.setStorageSize(OZONE_SCM_CONTAINER_SIZE, 5, StorageUnit.GB);
-    cluster = MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(1).build();
+    cluster = MiniOzoneCluster.newBuilder(ozoneConf).setNumDatanodes(3).build();
     storageClient = new ContainerOperationClient(ozoneConf);
     cluster.waitForClusterToBeReady();
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
index 76eee6a..548f9b6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestContainerStateMachineIdempotency.java
@@ -61,7 +61,7 @@ public class TestContainerStateMachineIdempotency {
     ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
         SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
     cluster =
-        MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
+        MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(3).build();
     cluster.waitForClusterToBeReady();
     storageContainerLocationClient =
         cluster.getStorageContainerLocationClient();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index efc2736..99b4083 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -282,11 +282,11 @@ public class TestMiniOzoneCluster {
    * Test that a DN can register with SCM even if it was started before the SCM.
    * @throws Exception
    */
-  @Test (timeout = 300_000)
+  @Test (timeout = 60000)
   public void testDNstartAfterSCM() throws Exception {
-    // Start a cluster with 1 DN
+    // Start a cluster with 3 DN
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(1)
+        .setNumDatanodes(3)
         .build();
     cluster.waitForClusterToBeReady();
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 86dd75a..706f880 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -19,8 +19,12 @@ package org.apache.hadoop.ozone;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic
     .NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_COMMAND_STATUS_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.argThat;
 import static org.mockito.Matchers.eq;
@@ -328,10 +332,12 @@ public class TestStorageContainerManager {
         100, TimeUnit.MILLISECONDS);
     conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
         numKeys);
+    conf.setBoolean(HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
     MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
         .setHbInterval(1000)
         .setHbProcessorInterval(3000)
+        .setNumDatanodes(1)
         .build();
     cluster.waitForClusterToBeReady();
 
@@ -458,7 +464,7 @@ public class TestStorageContainerManager {
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, scmPath.toString());
     //This will set the cluster id in the version file
     MiniOzoneCluster cluster =
-        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
     cluster.waitForClusterToBeReady();
     try {
       // This will initialize SCM
@@ -570,6 +576,7 @@ public class TestStorageContainerManager {
         100, TimeUnit.MILLISECONDS);
     conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT,
         numKeys);
+    conf.setBoolean(HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
     MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
         .setHbInterval(1000)
@@ -597,7 +604,7 @@ public class TestStorageContainerManager {
 
       scm.getContainerManager().updateContainerState(selectedContainer
           .containerID(), HddsProtos.LifeCycleEvent.FINALIZE);
-      cluster.restartStorageContainerManager(true);
+      cluster.restartStorageContainerManager(false);
       scm = cluster.getStorageContainerManager();
       EventPublisher publisher = mock(EventPublisher.class);
       ReplicationManager replicationManager = scm.getReplicationManager();
@@ -607,8 +614,7 @@ public class TestStorageContainerManager {
       modifiersField.setAccessible(true);
       modifiersField.setInt(f, f.getModifiers() & ~Modifier.FINAL);
       f.set(replicationManager, publisher);
-      scm.getReplicationManager().start();
-      Thread.sleep(2000);
+      Thread.sleep(10000);
 
       UUID dnUuid = cluster.getHddsDatanodes().iterator().next()
           .getDatanodeDetails().getUuid();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
index 623b11d..e6ed498 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBCSID.java
@@ -45,6 +45,8 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.
     HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.
     HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_SCM_SAFEMODE_PIPELINE_CREATION;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
     HDDS_SCM_WATCHER_TIMEOUT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
@@ -81,6 +83,7 @@ public class TestBCSID {
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.setQuietMode(false);
+    conf.setBoolean(HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     cluster =
         MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
             .build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 82c4910..e4b34fb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -52,6 +52,8 @@ import java.util.concurrent.TimeUnit;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_SCM_SAFEMODE_MIN_PIPELINE;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
 
@@ -93,6 +95,7 @@ public class TestContainerStateMachine {
     OzoneManager.setTestSecureOmFlag(true);
     conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
     //  conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
+    conf.setInt(HDDS_SCM_SAFEMODE_MIN_PIPELINE, 0);
     cluster =
         MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1)
             .setHbInterval(200)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
index 0fb15d0..e6302e3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachineFailures.java
@@ -73,6 +73,8 @@ import static org.apache.hadoop.hdds.HddsConfigKeys.
     HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.
     HDDS_CONTAINER_REPORT_INTERVAL;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_PIPELINE_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
     ContainerDataProto.State.UNHEALTHY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.
@@ -118,6 +120,8 @@ public class TestContainerStateMachineFailures {
         TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
         TimeUnit.MILLISECONDS);
+    conf.setTimeDuration(HDDS_PIPELINE_REPORT_INTERVAL, 200,
+        TimeUnit.MILLISECONDS);
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.setTimeDuration(OZONE_SCM_PIPELINE_DESTROY_TIMEOUT, 10,
@@ -129,7 +133,7 @@ public class TestContainerStateMachineFailures {
     conf.setLong(OzoneConfigKeys.DFS_RATIS_SNAPSHOT_THRESHOLD_KEY, 1);
     conf.setQuietMode(false);
     cluster =
-        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).setHbInterval(200)
+        MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(200)
             .build();
     cluster.waitForClusterToBeReady();
     //the easiest way to create an open container is creating a key
@@ -576,6 +580,7 @@ public class TestContainerStateMachineFailures {
     Assert.assertTrue(!dispatcher.getMissingContainerSet().isEmpty());
     Assert
         .assertTrue(dispatcher.getMissingContainerSet().contains(containerID));
+
     // write a new key
     key = objectStore.getVolume(volumeName).getBucket(bucketName)
         .createKey("ratis", 1024, ReplicationType.RATIS, ReplicationFactor.ONE,
@@ -599,7 +604,7 @@ public class TestContainerStateMachineFailures {
     byte[] blockCommitSequenceIdKey =
         DFSUtil.string2Bytes(OzoneConsts.BLOCK_COMMIT_SEQUENCE_ID_PREFIX);
 
-    // modify the bcsid for the container in the ROCKS DB tereby inducing
+    // modify the bcsid for the container in the ROCKS DB thereby inducing
     // corruption
     db.getStore().put(blockCommitSequenceIdKey, Longs.toByteArray(0));
     db.decrementReference();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
index 9768943..8310183 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
@@ -73,7 +73,7 @@ public class TestContainerReplication {
   @Before
   public void setup() throws Exception {
     conf = newOzoneConfiguration();
-    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(2)
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3)
         .setRandomContainerPort(true).build();
   }
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
index c7b7992..ffe5b6f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestBlockDeletion.java
@@ -112,7 +112,7 @@ public class TestBlockDeletion {
         3, TimeUnit.SECONDS);
     conf.setQuietMode(false);
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(1)
+        .setNumDatanodes(3)
         .setHbInterval(200)
         .build();
     cluster.waitForClusterToBeReady();
@@ -143,7 +143,7 @@ public class TestBlockDeletion {
     String keyName = UUID.randomUUID().toString();
 
     OzoneOutputStream out = bucket.createKey(keyName, value.getBytes().length,
-        ReplicationType.RATIS, ReplicationFactor.ONE, new HashMap<>());
+        ReplicationType.RATIS, ReplicationFactor.THREE, new HashMap<>());
     for (int i = 0; i < 100; i++) {
       out.write(value.getBytes());
     }
@@ -152,7 +152,7 @@ public class TestBlockDeletion {
     OmKeyArgs keyArgs = new OmKeyArgs.Builder().setVolumeName(volumeName)
         .setBucketName(bucketName).setKeyName(keyName).setDataSize(0)
         .setType(HddsProtos.ReplicationType.RATIS)
-        .setFactor(HddsProtos.ReplicationFactor.ONE)
+        .setFactor(HddsProtos.ReplicationFactor.THREE)
         .setRefreshPipeline(true)
         .build();
     List<OmKeyLocationInfoGroup> omKeyLocationInfoGroupList =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
index 5c7f2c1..3320f94 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
 import java.util.HashMap;
 
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -58,6 +59,7 @@ public class TestCloseContainerHandler {
     //setup a cluster (1G free space is enough for a unit test)
     conf = new OzoneConfiguration();
     conf.set(OZONE_SCM_CONTAINER_SIZE, "1GB");
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(1).build();
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
index 1cbf69e..4df6f69 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -66,6 +67,7 @@ public class TestDeleteContainerHandler {
   public static void setup() throws Exception {
     conf = new OzoneConfiguration();
     conf.set(OZONE_SCM_CONTAINER_SIZE, "1GB");
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(1).build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
index 7fb9825..1d04330 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/dn/scrubber/TestDataScrubber.java
@@ -85,6 +85,8 @@ public class TestDataScrubber {
     ozoneConfig.set(HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL, "1s");
     ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
         SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
+    ozoneConfig.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        false);
     cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1)
         .build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
index 732fb34..e1e0cfa 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestKeyPurging.java
@@ -67,7 +67,7 @@ public class TestKeyPurging {
     conf.setQuietMode(false);
 
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(1)
+        .setNumDatanodes(3)
         .setHbInterval(200)
         .build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java
index 3614a05..1c1f034 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestScmSafeMode.java
@@ -341,7 +341,7 @@ public class TestScmSafeMode {
     builder = MiniOzoneCluster.newBuilder(conf)
         .setHbInterval(1000)
         .setHbProcessorInterval(500)
-        .setNumDatanodes(1);
+        .setNumDatanodes(3);
     cluster = builder.build();
     StorageContainerManager scm = cluster.getStorageContainerManager();
     assertFalse(scm.isInSafeMode());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
index 48ce4a6..9d187ff 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestContainerSmallFile.java
@@ -61,7 +61,7 @@ public class TestContainerSmallFile {
     ozoneConfig = new OzoneConfiguration();
     ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
         SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
-    cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1)
+    cluster = MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(3)
         .build();
     cluster.waitForClusterToBeReady();
     storageContainerLocationClient = cluster
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
index b19020f..2f8c755 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestGetCommittedBlockLengthAndPutKey.java
@@ -65,7 +65,7 @@ public class TestGetCommittedBlockLengthAndPutKey {
     ozoneConfig.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
         SCMContainerPlacementCapacity.class, ContainerPlacementPolicy.class);
     cluster =
-        MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(1).build();
+        MiniOzoneCluster.newBuilder(ozoneConfig).setNumDatanodes(3).build();
     cluster.waitForClusterToBeReady();
     storageContainerLocationClient =
         cluster.getStorageContainerLocationClient();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
index e700a0e..59a28f7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMMXBean.java
@@ -57,7 +57,7 @@ import static org.junit.Assert.fail;
 public class TestSCMMXBean {
 
   public static final Log LOG = LogFactory.getLog(TestSCMMXBean.class);
-  private static int numOfDatanodes = 1;
+  private static int numOfDatanodes = 3;
   private static MiniOzoneCluster cluster;
   private static OzoneConfiguration conf;
   private static StorageContainerManager scm;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
index 65a6357..4aa1eae 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestSCMNodeMetrics.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.scm.node;
 
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
@@ -49,7 +50,8 @@ public class TestSCMNodeMetrics {
   @Before
   public void setup() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
-    cluster = MiniOzoneCluster.newBuilder(conf).build();
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+    cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(1).build();
     cluster.waitForClusterToBeReady();
   }
 
diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsRenameDir.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsRenameDir.java
index 1d58465..2b95cbd 100644
--- a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsRenameDir.java
+++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsRenameDir.java
@@ -52,7 +52,7 @@ public class TestOzoneFsRenameDir {
   public void init() throws Exception {
     conf = new OzoneConfiguration();
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(1)
+        .setNumDatanodes(3)
         .build();
     cluster.waitForClusterToBeReady();
 
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
index 112674a..b6a8f18 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/fsck/TestContainerMapper.java
@@ -65,7 +65,7 @@ public class TestContainerMapper {
     conf.set(OZONE_OM_DB_DIRS, dbPath);
     conf.set(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, "100MB");
     cluster = MiniOzoneCluster.newBuilder(conf)
-        .setNumDatanodes(1)
+        .setNumDatanodes(3)
         .setScmId(SCM_ID)
         .build();
     cluster.waitForClusterToBeReady();


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org