You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2019/10/13 06:53:05 UTC

[hadoop-ozone] 01/03: HDDS-2034. sync RATIS pipeline creation and destroy through heartbeat commands.

This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch HDDS-2034
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit f7b9ad365cd811b785db5a738241f139832b1fc2
Author: Sammi Chen <sa...@apache.org>
AuthorDate: Mon Sep 23 20:24:19 2019 +0800

    HDDS-2034. sync RATIS pipeline creation and destroy through heartbeat commands.
---
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |  12 +-
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |   7 +
 .../common/src/main/resources/ozone-default.xml    |  30 ++-
 hadoop-hdds/container-service/pom.xml              |   5 +
 .../common/statemachine/DatanodeStateMachine.java  |  14 ++
 .../common/statemachine/StateContext.java          |  10 +
 .../CloseContainerCommandHandler.java              |  11 +-
 .../ClosePipelineCommandHandler.java               | 166 +++++++++++++++
 .../commandhandler/CommandHandler.java             |   2 +-
 .../CreatePipelineCommandHandler.java              | 228 +++++++++++++++++++++
 .../states/endpoint/HeartbeatEndpointTask.java     |  22 ++
 .../protocol/commands/ClosePipelineCommand.java    |  73 +++++++
 .../protocol/commands/CreatePipelineCommand.java   | 100 +++++++++
 .../commands/CreatePipelineCommandStatus.java      |  97 +++++++++
 .../proto/StorageContainerDatanodeProtocol.proto   |  31 +++
 .../hadoop/hdds/scm/block/BlockManagerImpl.java    |  29 +++
 .../scm/command/CommandStatusReportHandler.java    |  24 ++-
 .../apache/hadoop/hdds/scm/events/SCMEvents.java   |  22 +-
 .../hadoop/hdds/scm/pipeline/PipelineFactory.java  |  13 +-
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |  11 +-
 .../hadoop/hdds/scm/pipeline/PipelineProvider.java |   2 +
 .../hdds/scm/pipeline/PipelineReportHandler.java   |  32 +--
 .../hdds/scm/pipeline/PipelineStateManager.java    |   2 +-
 .../hdds/scm/pipeline/RatisPipelineProvider.java   | 124 ++++-------
 .../hdds/scm/pipeline/RatisPipelineUtils.java      | 101 ---------
 .../hdds/scm/pipeline/SCMPipelineManager.java      | 119 +++++++++--
 .../hdds/scm/pipeline/SCMPipelineMetrics.java      |  11 +
 .../hdds/scm/pipeline/SimplePipelineProvider.java  |   5 +
 .../scm/safemode/HealthyPipelineSafeModeRule.java  |  97 ++++-----
 .../safemode/OneReplicaPipelineSafeModeRule.java   |  64 ++----
 .../hdds/scm/safemode/SCMSafeModeManager.java      |  13 +-
 .../hadoop/hdds/scm/safemode/SafeModeHandler.java  |   5 +-
 .../scm/server/SCMDatanodeHeartbeatDispatcher.java |   1 +
 .../hdds/scm/server/SCMDatanodeProtocolServer.java |  20 ++
 .../hdds/scm/server/StorageContainerManager.java   |   4 +-
 .../java/org/apache/hadoop/hdds/scm/TestUtils.java |  34 +++
 .../container/TestCloseContainerEventHandler.java  |   3 +
 .../hadoop/hdds/scm/node/TestDeadNodeHandler.java  |  11 +-
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   |   9 +-
 .../scm/pipeline/MockRatisPipelineProvider.java    |   3 +-
 .../safemode/TestHealthyPipelineSafeModeRule.java  |  39 +---
 .../TestOneReplicaPipelineSafeModeRule.java        |  33 +--
 .../hdds/scm/safemode/TestSCMSafeModeManager.java  |  36 ++--
 .../TestContainerStateManagerIntegration.java      |   4 +-
 .../hdds/scm/pipeline/TestPipelineClose.java       |   2 +-
 .../scm/pipeline/TestRatisPipelineProvider.java    |  15 +-
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |  46 ++---
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  16 +-
 48 files changed, 1265 insertions(+), 493 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 99972ae..5e161b3 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -81,7 +81,12 @@ public final class HddsConfigKeys {
   public static final String HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK =
       "hdds.scm.safemode.pipeline-availability.check";
   public static final boolean
-      HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = false;
+      HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT = true;
+
+  public static final String HDDS_SCM_SAFEMODE_PIPELINE_CREATION =
+      "hdds.scm.safemode.pipeline.creation";
+  public static final boolean
+      HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT = true;
 
   // % of containers which should have at least one reported replica
   // before SCM comes out of safe mode.
@@ -89,13 +94,16 @@ public final class HddsConfigKeys {
       "hdds.scm.safemode.threshold.pct";
   public static final double HDDS_SCM_SAFEMODE_THRESHOLD_PCT_DEFAULT = 0.99;
 
-
   // percentage of healthy pipelines, where all 3 datanodes are reported in the
   // pipeline.
   public static final String HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT =
       "hdds.scm.safemode.healthy.pipelie.pct";
   public static final double
       HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT = 0.10;
+  // number of healthy RATIS pipeline(ONE or THREE factor)
+  public static final String HDDS_SCM_SAFEMODE_MIN_PIPELINE =
+      "hdds.scm.safemode.min.pipeline";
+  public static final int HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT = 1;
 
   public static final String HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT =
       "hdds.scm.safemode.atleast.one.node.reported.pipeline.pct";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 1627569..9d4b728 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -51,6 +51,7 @@ public final class Pipeline {
   private Map<DatanodeDetails, Long> nodeStatus;
   // nodes with ordered distance to client
   private ThreadLocal<List<DatanodeDetails>> nodesInOrder = new ThreadLocal<>();
+  private final long creationTime;
 
   /**
    * The immutable properties of pipeline object is used in
@@ -65,6 +66,7 @@ public final class Pipeline {
     this.factor = factor;
     this.state = state;
     this.nodeStatus = nodeStatus;
+    this.creationTime = System.currentTimeMillis();
   }
 
   /**
@@ -135,6 +137,11 @@ public final class Pipeline {
     return state == PipelineState.OPEN;
   }
 
+  public boolean isAllocationTimeout() {
+    //TODO: define a system property to control the timeout value
+    return false;
+  }
+
   public void setNodesInOrder(List<DatanodeDetails> nodes) {
     nodesInOrder.set(nodes);
   }
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a038047..1d592fa 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -311,15 +311,6 @@
       defined with postfix (ns,ms,s,m,h,d)</description>
   </property>
   <property>
-    <name>hdds.command.status.report.interval</name>
-    <value>60000ms</value>
-    <tag>OZONE, CONTAINER, MANAGEMENT</tag>
-    <description>Time interval of the datanode to send status of command
-      execution. Each datanode periodically the execution status of commands
-      received from SCM to SCM. Unit could be defined with postfix
-      (ns,ms,s,m,h,d)</description>
-  </property>
-  <property>
     <name>hdds.pipeline.report.interval</name>
     <value>60000ms</value>
     <tag>OZONE, PIPELINE, MANAGEMENT</tag>
@@ -1315,7 +1306,7 @@
 
   <property>
     <name>hdds.scm.safemode.pipeline-availability.check</name>
-    <value>false</value>
+    <value>true</value>
     <tag>HDDS,SCM,OPERATION</tag>
     <description>
       Boolean value to enable pipeline availability check during SCM safe mode.
@@ -1401,6 +1392,25 @@
   </property>
 
   <property>
+    <name>hdds.scm.safemode.pipeline.creation</name>
+    <value>true</value>
+    <tag>HDDS,SCM,OPERATION</tag>
+    <description>
+      Boolean value to enable background pipeline creation in SCM safe mode.
+    </description>
+  </property>
+
+  <property>
+    <name>hdds.scm.safemode.min.pipeline</name>
+    <value>1</value>
+    <tag>HDDS,SCM,OPERATION</tag>
+    <description>
+      Minimum RATIS pipeline number to exit SCM safe mode. Considered only when
+      "hdds.scm.safemode.pipeline.creation" is True.
+    </description>
+  </property>
+
+  <property>
     <name>hdds.lock.max.concurrency</name>
     <value>100</value>
     <tag>HDDS</tag>
diff --git a/hadoop-hdds/container-service/pom.xml b/hadoop-hdds/container-service/pom.xml
index 2f89fa2..bb85f3c 100644
--- a/hadoop-hdds/container-service/pom.xml
+++ b/hadoop-hdds/container-service/pom.xml
@@ -38,6 +38,11 @@ https://maven.apache.org/xsd/maven-4.0.0.xsd">
       <artifactId>hadoop-hdds-server-framework</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdds-client</artifactId>
+      <version>${hdds.version}</version>
+    </dependency>
+    <dependency>
       <groupId>io.dropwizard.metrics</groupId>
       <artifactId>metrics-core</artifactId>
     </dependency>
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index c9eb702..926f19c 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -39,8 +39,12 @@ import org.apache.hadoop.ozone.container.common.report.ReportManager;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CloseContainerCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .ClosePipelineCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CommandDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+    .CreatePipelineCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .DeleteBlocksCommandHandler;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .DeleteContainerCommandHandler;
@@ -126,6 +130,8 @@ public class DatanodeStateMachine implements Closeable {
             conf))
         .addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
         .addHandler(new DeleteContainerCommandHandler())
+        .addHandler(new ClosePipelineCommandHandler())
+        .addHandler(new CreatePipelineCommandHandler())
         .setConnectionManager(connectionManager)
         .setContainer(container)
         .setContext(context)
@@ -486,4 +492,12 @@ public class DatanodeStateMachine implements Closeable {
   public ReplicationSupervisor getSupervisor() {
     return supervisor;
   }
+
+  public Configuration getConf() {
+    return conf;
+  }
+
+  public CertificateClient getCertificateClient() {
+    return dnCertClient;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 2c01f3a..17e5502 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -37,6 +37,8 @@ import org.apache.hadoop.ozone.container.common.states.datanode
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.ozone.protocol.commands
     .DeleteBlockCommandStatus.DeleteBlockCommandStatusBuilder;
+import org.apache.hadoop.ozone.protocol.commands
+    .CreatePipelineCommandStatus.CreatePipelineCommandStatusBuilder;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
 import static java.lang.Math.min;
@@ -464,6 +466,14 @@ public class StateContext {
               .setType(cmd.getType())
               .build());
     }
+    if (cmd.getType() == SCMCommandProto.Type.createPipelineCommand) {
+      addCmdStatus(cmd.getId(),
+          CreatePipelineCommandStatusBuilder.newBuilder()
+              .setCmdId(cmd.getId())
+              .setStatus(Status.PENDING)
+              .setType(cmd.getType())
+              .build());
+    }
   }
 
   /**
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index 881fea0..f7b80cd 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -39,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Handler for close container command received from SCM.
@@ -48,7 +49,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
   private static final Logger LOG =
       LoggerFactory.getLogger(CloseContainerCommandHandler.class);
 
-  private int invocationCount;
+  private AtomicLong invocationCount = new AtomicLong(0);
   private long totalTime;
 
   /**
@@ -69,7 +70,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
   public void handle(SCMCommand command, OzoneContainer ozoneContainer,
       StateContext context, SCMConnectionManager connectionManager) {
     LOG.debug("Processing Close Container command.");
-    invocationCount++;
+    invocationCount.incrementAndGet();
     final long startTime = Time.monotonicNow();
     final DatanodeDetails datanodeDetails = context.getParent()
         .getDatanodeDetails();
@@ -159,7 +160,7 @@ public class CloseContainerCommandHandler implements CommandHandler {
    */
   @Override
   public int getInvocationCount() {
-    return invocationCount;
+    return (int)invocationCount.get();
   }
 
   /**
@@ -169,8 +170,8 @@ public class CloseContainerCommandHandler implements CommandHandler {
    */
   @Override
   public long getAverageRunTime() {
-    if (invocationCount > 0) {
-      return totalTime / invocationCount;
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
     }
     return 0;
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
new file mode 100644
index 0000000..bd48839
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client
+    .CertificateClient;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * Handler for close pipeline command received from SCM.
+ */
+public class ClosePipelineCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ClosePipelineCommandHandler.class);
+
+  private AtomicLong invocationCount = new AtomicLong(0);
+  private long totalTime;
+
+  /**
+   * Constructs a closePipelineCommand handler.
+   */
+  public ClosePipelineCommandHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command           - SCM Command
+   * @param ozoneContainer    - Ozone Container.
+   * @param context           - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+      StateContext context, SCMConnectionManager connectionManager) {
+    invocationCount.incrementAndGet();
+    final long startTime = Time.monotonicNow();
+    final DatanodeDetails datanode = context.getParent()
+        .getDatanodeDetails();
+    final ClosePipelineCommandProto closeCommand =
+        ((ClosePipelineCommand)command).getProto();
+    final PipelineID pipelineID = PipelineID.getFromProtobuf(
+        closeCommand.getPipelineID());
+
+    try {
+      destroyPipeline(datanode, pipelineID, context);
+      LOG.info("Close Pipeline #{} command on datanode #{}.", pipelineID,
+          datanode.getUuidString());
+    } catch (IOException e) {
+      LOG.error("Can't close pipeline #{}", pipelineID, e);
+    } finally {
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
+
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.closePipelineCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return (int)invocationCount.get();
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
+    }
+    return 0;
+  }
+
+  /**
+   * Destroy pipeline on this datanode.
+   *
+   * @param dn         - Datanode on which pipeline needs to be destroyed
+   * @param pipelineID - ID of pipeline to be destroyed
+   * @param context    - Ozone datanode context
+   * @throws IOException
+   */
+  void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
+      StateContext context) throws IOException {
+    final Configuration ozoneConf = context.getParent().getConf();
+    final String rpcType = ozoneConf
+        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
+            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
+    final RaftPeer p = RatisHelper.toRaftPeer(dn);
+    final int maxOutstandingRequests =
+        HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
+    final CertificateClient dnCertClient =
+        context.getParent().getCertificateClient();
+    final GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfigForDN(
+        new SecurityConfig(ozoneConf), dnCertClient);
+    final TimeDuration requestTimeout =
+        RatisHelper.getClientRequestTimeout(ozoneConf);
+    try(RaftClient client = RatisHelper
+        .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
+            retryPolicy, maxOutstandingRequests, tlsConfig, requestTimeout)) {
+      client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()),
+          true, p.getId());
+    }
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
index 1ea0ea8..dca02f6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
@@ -68,7 +68,7 @@ public interface CommandHandler {
   default void updateCommandStatus(StateContext context, SCMCommand command,
       Consumer<CommandStatus> cmdStatusUpdater, Logger log) {
     if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) {
-      log.debug("{} with Id:{} not found.", command.getType(),
+      log.warn("{} with Id:{} not found.", command.getType(),
           command.getId());
     }
   }
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
new file mode 100644
index 0000000..e6d20e7
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client
+    .CertificateClient;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommandStatus;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Handler for create pipeline command received from SCM.
+ */
+public class CreatePipelineCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(CreatePipelineCommandHandler.class);
+
+  private AtomicLong invocationCount = new AtomicLong(0);
+  private long totalTime;
+
+  /**
+   * Constructs a createPipelineCommand handler.
+   */
+  public CreatePipelineCommandHandler() {
+  }
+
+  /**
+   * Handles a given SCM command.
+   *
+   * @param command           - SCM Command
+   * @param ozoneContainer    - Ozone Container.
+   * @param context           - Current Context.
+   * @param connectionManager - The SCMs that we are talking to.
+   */
+  @Override
+  public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+      StateContext context, SCMConnectionManager connectionManager) {
+    invocationCount.incrementAndGet();
+    final long startTime = Time.monotonicNow();
+    final DatanodeDetails dn = context.getParent()
+        .getDatanodeDetails();
+    final CreatePipelineCommandProto createCommand =
+        ((CreatePipelineCommand)command).getProto();
+    final PipelineID pipelineID = PipelineID.getFromProtobuf(
+        createCommand.getPipelineID());
+    Collection<DatanodeDetails> peers =
+        createCommand.getDatanodeList().stream()
+            .map(DatanodeDetails::getFromProtoBuf)
+            .collect(Collectors.toList());
+
+    final CreatePipelineACKProto createPipelineACK =
+        CreatePipelineACKProto.newBuilder()
+            .setPipelineID(createCommand.getPipelineID())
+            .setDatanodeUUID(dn.getUuidString()).build();
+    boolean success = false;
+    try {
+      createPipeline(dn, pipelineID, peers, context);
+      success = true;
+      LOG.info("Create Pipeline {} {} #{} command succeed on datanode {}.",
+          createCommand.getType(), createCommand.getFactor(), pipelineID,
+          dn.getUuidString());
+    } catch (NotLeaderException e) {
+      LOG.debug("Follower cannot create pipeline #{}.", pipelineID);
+    } catch (IOException e) {
+      LOG.error("Can't create pipeline {} {} #{}", createCommand.getType(),
+          createCommand.getFactor(), pipelineID, e);
+    } finally {
+      final boolean cmdExecuted = success;
+      Consumer<CommandStatus> statusUpdater = (cmdStatus) -> {
+        cmdStatus.setStatus(cmdExecuted);
+        ((CreatePipelineCommandStatus)cmdStatus)
+            .setCreatePipelineAck(createPipelineACK);
+      };
+      updateCommandStatus(context, command, statusUpdater, LOG);
+      long endTime = Time.monotonicNow();
+      totalTime += endTime - startTime;
+    }
+  }
+
+  /**
+   * Returns the command type that this command handler handles.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getCommandType() {
+    return SCMCommandProto.Type.createPipelineCommand;
+  }
+
+  /**
+   * Returns number of times this handler has been invoked.
+   *
+   * @return int
+   */
+  @Override
+  public int getInvocationCount() {
+    return (int)invocationCount.get();
+  }
+
+  /**
+   * Returns the average time this function takes to run.
+   *
+   * @return long
+   */
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount.get() > 0) {
+      return totalTime / invocationCount.get();
+    }
+    return 0;
+  }
+
+  /**
+   * Sends ratis command to create pipeline on this datanode.
+   *
+   * @param dn       - this datanode
+   * @param pipelineId   - pipeline ID
+   * @param peers        - datanodes of the pipeline
+   * @param context      - Ozone datanode context
+   * @throws IOException
+   */
+  private void createPipeline(DatanodeDetails dn, PipelineID pipelineId,
+      Collection<DatanodeDetails> peers, StateContext context)
+      throws IOException {
+    final Configuration ozoneConf = context.getParent().getConf();
+    final RaftGroup group = RatisHelper.newRaftGroup(
+        RaftGroupId.valueOf(pipelineId.getId()), peers);
+    LOG.debug("creating pipeline:#{} with {}", pipelineId, group);
+
+    final String rpcType = ozoneConf
+        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
+            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
+    final List< IOException > exceptions =
+        Collections.synchronizedList(new ArrayList<>());
+    final int maxOutstandingRequests =
+        HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
+    final CertificateClient dnCertClient =
+        context.getParent().getCertificateClient();
+    final GrpcTlsConfig tlsConfig = RatisHelper.createTlsServerConfigForDN(
+        new SecurityConfig(ozoneConf), dnCertClient);
+    final TimeDuration requestTimeout =
+        RatisHelper.getClientRequestTimeout(ozoneConf);
+    try {
+      final RaftPeer peer = RatisHelper.toRaftPeer(dn);
+      try (RaftClient client = RatisHelper
+          .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), peer,
+              retryPolicy, maxOutstandingRequests, tlsConfig,
+              requestTimeout)) {
+
+        RaftClientReply reply = client.groupAdd(group, peer.getId());
+        if (reply == null || !reply.isSuccess()) {
+          String msg = "Pipeline initialization failed for pipeline:"
+              + pipelineId.getId() + " node:" + peer.getId();
+          throw new IOException(msg);
+        }
+      } catch (IOException ioe) {
+        String errMsg =
+            "Failed invoke Ratis rpc for " + dn.getUuid();
+        exceptions.add(new IOException(errMsg, ioe));
+      }
+    } catch (RejectedExecutionException ex) {
+      throw new IOException(ex.getClass().getName() + " exception occurred " +
+          "during createPipeline", ex);
+    }
+
+    if (!exceptions.isEmpty()) {
+      throw MultipleIOException.createIOException(exceptions);
+    }
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index c50f457..a55d0d6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -46,6 +46,8 @@ import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine.EndPointStates;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
@@ -309,6 +311,26 @@ public class HeartbeatEndpointTask
         }
         this.context.addCommand(deleteContainerCommand);
         break;
+      case createPipelineCommand:
+        CreatePipelineCommand createPipelineCommand =
+            CreatePipelineCommand.getFromProtobuf(
+                commandResponseProto.getCreatePipelineCommandProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM create pipeline request {}",
+              createPipelineCommand.getPipelineID());
+        }
+        this.context.addCommand(createPipelineCommand);
+        break;
+      case closePipelineCommand:
+        ClosePipelineCommand closePipelineCommand =
+            ClosePipelineCommand.getFromProtobuf(
+                commandResponseProto.getClosePipelineCommandProto());
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Received SCM close pipeline request {}",
+              closePipelineCommand.getPipelineID());
+        }
+        this.context.addCommand(closePipelineCommand);
+        break;
       default:
         throw new IllegalArgumentException("Unknown response : "
             + commandResponseProto.getCommandType().name());
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java
new file mode 100644
index 0000000..1f75bc3
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ClosePipelineCommand.java
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ClosePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+
+/**
+ * Asks datanode to close a pipeline.
+ */
+public class ClosePipelineCommand
+    extends SCMCommand<ClosePipelineCommandProto> {
+
+  private final PipelineID pipelineID;
+
+  public ClosePipelineCommand(final PipelineID pipelineID) {
+    super();
+    this.pipelineID = pipelineID;
+  }
+
+  public ClosePipelineCommand(long cmdId, final PipelineID pipelineID) {
+    super(cmdId);
+    this.pipelineID = pipelineID;
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.closePipelineCommand;
+  }
+
+  @Override
+  public ClosePipelineCommandProto getProto() {
+    ClosePipelineCommandProto.Builder builder =
+        ClosePipelineCommandProto.newBuilder();
+    builder.setCmdId(getId());
+    builder.setPipelineID(pipelineID.getProtobuf());
+    return builder.build();
+  }
+
+  public static ClosePipelineCommand getFromProtobuf(
+      ClosePipelineCommandProto createPipelineProto) {
+    Preconditions.checkNotNull(createPipelineProto);
+    return new ClosePipelineCommand(createPipelineProto.getCmdId(),
+        PipelineID.getFromProtobuf(createPipelineProto.getPipelineID()));
+  }
+
+  public PipelineID getPipelineID() {
+    return pipelineID;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
new file mode 100644
index 0000000..9e22cbc
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommand.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CreatePipelineCommandProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Asks datanode to create a pipeline.
+ */
+public class CreatePipelineCommand
+    extends SCMCommand<CreatePipelineCommandProto> {
+
+  private final PipelineID pipelineID;
+  private final ReplicationFactor factor;
+  private final ReplicationType type;
+  private final List<DatanodeDetails> nodelist;
+
+  public CreatePipelineCommand(final PipelineID pipelineID,
+      final ReplicationType type, final ReplicationFactor factor,
+      final List<DatanodeDetails> datanodeList) {
+    super();
+    this.pipelineID = pipelineID;
+    this.factor = factor;
+    this.type = type;
+    this.nodelist = datanodeList;
+  }
+
+  public CreatePipelineCommand(long cmdId, final PipelineID pipelineID,
+      final ReplicationType type, final ReplicationFactor factor,
+      final List<DatanodeDetails> datanodeList) {
+    super(cmdId);
+    this.pipelineID = pipelineID;
+    this.factor = factor;
+    this.type = type;
+    this.nodelist = datanodeList;
+  }
+
+  /**
+   * Returns the type of this command.
+   *
+   * @return Type
+   */
+  @Override
+  public SCMCommandProto.Type getType() {
+    return SCMCommandProto.Type.createPipelineCommand;
+  }
+
+  @Override
+  public CreatePipelineCommandProto getProto() {
+    return CreatePipelineCommandProto.newBuilder()
+        .setCmdId(getId())
+        .setPipelineID(pipelineID.getProtobuf())
+        .setFactor(factor)
+        .setType(type)
+        .addAllDatanode(nodelist.stream()
+            .map(DatanodeDetails::getProtoBufMessage)
+            .collect(Collectors.toList()))
+        .build();
+  }
+
+  public static CreatePipelineCommand getFromProtobuf(
+      CreatePipelineCommandProto createPipelineProto) {
+    Preconditions.checkNotNull(createPipelineProto);
+    return new CreatePipelineCommand(createPipelineProto.getCmdId(),
+        PipelineID.getFromProtobuf(createPipelineProto.getPipelineID()),
+        createPipelineProto.getType(), createPipelineProto.getFactor(),
+        createPipelineProto.getDatanodeList().stream()
+            .map(DatanodeDetails::getFromProtoBuf)
+            .collect(Collectors.toList()));
+  }
+
+  public PipelineID getPipelineID() {
+    return pipelineID;
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommandStatus.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommandStatus.java
new file mode 100644
index 0000000..b5e7d6c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CreatePipelineCommandStatus.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+/**
+ * Command status to report about pipeline creation.
+ */
+public class CreatePipelineCommandStatus extends CommandStatus {
+
+  private CreatePipelineACKProto createPipelineAck;
+
+  public CreatePipelineCommandStatus(Type type, Long cmdId, Status status,
+      String msg, CreatePipelineACKProto ack) {
+    super(type, cmdId, status, msg);
+    this.createPipelineAck = ack;
+  }
+
+  public void setCreatePipelineAck(CreatePipelineACKProto ack) {
+    createPipelineAck = ack;
+  }
+
+  @Override
+  public CommandStatus getFromProtoBuf(
+      StorageContainerDatanodeProtocolProtos.CommandStatus cmdStatusProto) {
+    return CreatePipelineCommandStatusBuilder.newBuilder()
+        .setCreatePipelineAck(cmdStatusProto.getCreatePipelineAck())
+        .setCmdId(cmdStatusProto.getCmdId())
+        .setStatus(cmdStatusProto.getStatus())
+        .setType(cmdStatusProto.getType())
+        .setMsg(cmdStatusProto.getMsg())
+        .build();
+  }
+
+  @Override
+  public StorageContainerDatanodeProtocolProtos.CommandStatus
+      getProtoBufMessage() {
+    StorageContainerDatanodeProtocolProtos.CommandStatus.Builder builder =
+        StorageContainerDatanodeProtocolProtos.CommandStatus.newBuilder()
+            .setCmdId(this.getCmdId())
+            .setStatus(this.getStatus())
+            .setType(this.getType());
+    if (createPipelineAck != null) {
+      builder.setCreatePipelineAck(createPipelineAck);
+    }
+    if (this.getMsg() != null) {
+      builder.setMsg(this.getMsg());
+    }
+    return builder.build();
+  }
+
+  /**
+   * Builder for CreatePipelineCommandStatus.
+   */
+  public static final class CreatePipelineCommandStatusBuilder
+      extends CommandStatusBuilder {
+    private CreatePipelineACKProto createPipelineAck = null;
+
+    public static CreatePipelineCommandStatusBuilder newBuilder() {
+      return new CreatePipelineCommandStatusBuilder();
+    }
+
+    public CreatePipelineCommandStatusBuilder setCreatePipelineAck(
+        CreatePipelineACKProto ack) {
+      this.createPipelineAck = ack;
+      return this;
+    }
+
+    @Override
+    public CommandStatus build() {
+      return new CreatePipelineCommandStatus(getType(), getCmdId(), getStatus(),
+          getMsg(), createPipelineAck);
+    }
+  }
+}
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 1d09dfa..90124c6 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -177,6 +177,7 @@ message CommandStatus {
   required SCMCommandProto.Type type = 3;
   optional string msg = 4;
   optional ContainerBlocksDeletionACKProto blockDeletionAck = 5;
+  optional CreatePipelineACKProto createPipelineAck = 6;
 }
 
 message ContainerActionsProto {
@@ -243,6 +244,8 @@ message SCMCommandProto {
     closeContainerCommand = 3;
     deleteContainerCommand = 4;
     replicateContainerCommand = 5;
+    createPipelineCommand = 6;
+    closePipelineCommand = 7;
   }
   // TODO: once we start using protoc 3.x, refactor this message using "oneof"
   required Type commandType = 1;
@@ -251,6 +254,8 @@ message SCMCommandProto {
   optional CloseContainerCommandProto closeContainerCommandProto = 4;
   optional DeleteContainerCommandProto deleteContainerCommandProto = 5;
   optional ReplicateContainerCommandProto replicateContainerCommandProto = 6;
+  optional CreatePipelineCommandProto createPipelineCommandProto = 7;
+  optional ClosePipelineCommandProto closePipelineCommandProto = 8;
 }
 
 /**
@@ -320,6 +325,32 @@ message ReplicateContainerCommandProto {
 }
 
 /**
+This command asks the datanode to create a pipeline.
+*/
+message CreatePipelineCommandProto {
+  required PipelineID pipelineID = 1;
+  required ReplicationType type = 2;
+  required ReplicationFactor factor = 3;
+  repeated DatanodeDetailsProto datanode = 4;
+  required int64 cmdId = 5;
+}
+
+// ACK message datanode sent to SCM, contains the result of
+// pipeline creation status.
+message CreatePipelineACKProto {
+  required PipelineID pipelineID = 1;
+  required string datanodeUUID = 2;
+}
+
+/**
+This command asks the datanode to close a pipeline.
+*/
+message ClosePipelineCommandProto {
+  required PipelineID pipelineID = 1;
+  required int64 cmdId = 2;
+}
+
+/**
  * Protocol used from a datanode to StorageContainerManager.
  *
  * Please see the request and response messages for details of the RPC calls.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index a0a7222..1390c52 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -24,8 +24,11 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.TimeUnit;
 import javax.management.ObjectName;
+
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.hdds.client.ContainerBlockID;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -79,6 +82,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
 
   private ObjectName mxBean;
   private SafeModePrecheck safeModePrecheck;
+  private final long pipelineCreateWaitTimeout;
 
   /**
    * Constructor.
@@ -117,6 +121,22 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
             scm.getScmNodeManager(), scm.getEventQueue(), svcInterval,
             serviceTimeout, conf);
     safeModePrecheck = new SafeModePrecheck(conf);
+
+    long heartbeatInterval =  conf.getTimeDuration(
+        HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL,
+        HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    long commandStatusReportInterval =  conf.getTimeDuration(
+        HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL,
+        HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+
+    Preconditions.checkState(heartbeatInterval <= commandStatusReportInterval,
+        "Heartbeat interval is smaller than command status report interval");
+    pipelineCreateWaitTimeout =
+        ((commandStatusReportInterval + heartbeatInterval - 1)
+            / heartbeatInterval) * heartbeatInterval + 5000L;
   }
 
   /**
@@ -188,6 +208,15 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
           // TODO: #CLUTIL Remove creation logic when all replication types and
           // factors are handled by pipeline creator
           pipeline = pipelineManager.createPipeline(type, factor);
+          // wait until pipeline is ready
+          long current = System.currentTimeMillis();
+          while (!pipeline.isOpen() && System.currentTimeMillis() <
+              (current + pipelineCreateWaitTimeout)) {
+            try {
+              Thread.sleep(1000);
+            } catch (InterruptedException e) {
+            }
+          }
         } catch (IOException e) {
           LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
                   "get pipelines call once.", type, factor, e);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
index d1479f7..aad5573 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
@@ -19,8 +19,6 @@ package org.apache.hadoop.hdds.scm.command;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.CommandStatus;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
@@ -55,13 +53,21 @@ public class CommandStatusReportHandler implements
     cmdStatusList.forEach(cmdStatus -> {
       LOGGER.trace("Emitting command status for id:{} type: {}", cmdStatus
           .getCmdId(), cmdStatus.getType());
-      if (cmdStatus.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
+      switch (cmdStatus.getType()) {
+      case deleteBlocksCommand:
         if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
           publisher.fireEvent(SCMEvents.DELETE_BLOCK_STATUS,
               new DeleteBlockStatus(cmdStatus));
         }
-      } else {
-        LOGGER.debug("CommandStatus of type:{} not handled in " +
+        break;
+      case createPipelineCommand:
+        if (cmdStatus.getStatus() != CommandStatus.Status.PENDING) {
+          publisher.fireEvent(SCMEvents.CREATE_PIPELINE_STATUS,
+              new CreatePipelineStatus(cmdStatus));
+        }
+        break;
+      default:
+        LOGGER.warn("CommandStatus of type:{} not handled in " +
             "CommandStatusReportHandler.", cmdStatus.getType());
       }
     });
@@ -101,4 +107,12 @@ public class CommandStatusReportHandler implements
     }
   }
 
+  /**
+   * Wrapper event for CreatePipeline Command Status.
+   */
+  public static class CreatePipelineStatus extends CommandStatusEvent {
+    public CreatePipelineStatus(CommandStatus cmdStatus) {
+      super(cmdStatus);
+    }
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 43d396e..dd476a7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.hdds.scm.events;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.block.PendingDeleteStatusList;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -40,6 +41,8 @@ import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .NodeReportFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeProtocolServer
     .NodeRegistrationContainerReport;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
+    .CreatePipelineStatus;
 import org.apache.hadoop.hdds.server.events.Event;
 import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -97,15 +100,14 @@ public final class SCMEvents {
           new TypedEvent<>(PipelineReportFromDatanode.class, "Pipeline_Report");
 
   /**
-   * PipelineReport processed by pipeline report handler. This event is
+   * Open pipeline event sent by ScmPipelineManager. This event is
    * received by HealthyPipelineSafeModeRule.
    */
-  public static final TypedEvent<PipelineReportFromDatanode>
-      PROCESSED_PIPELINE_REPORT = new TypedEvent<>(
-          PipelineReportFromDatanode.class, "Processed_Pipeline_Report");
+  public static final TypedEvent<Pipeline>
+      OPEN_PIPELINE = new TypedEvent<>(Pipeline.class, "Open_Pipeline");
 
   /**
-   * PipelineActions are sent by Datanode. This event is received by
+   * PipelineActions are sent by Datanode to close a pipeline. It's received by
    * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated.
    */
   public static final TypedEvent<PipelineActionsFromDatanode>
@@ -113,7 +115,7 @@ public final class SCMEvents {
       "Pipeline_Actions");
 
   /**
-   * A Command status report will be sent by datanodes. This repoort is received
+   * A Command status report will be sent by datanodes. This report is received
    * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated.
    */
   public static final TypedEvent<CommandStatusReportFromDatanode>
@@ -197,6 +199,14 @@ public final class SCMEvents {
       new TypedEvent<>(SafeModeStatus.class);
 
   /**
+   * This event is triggered by CommandStatusReportHandler whenever a
+   * status for CreatePipeline SCMCommand is received.
+   */
+  public static final TypedEvent<CreatePipelineStatus>
+      CREATE_PIPELINE_STATUS =
+      new TypedEvent<>(CreatePipelineStatus.class, "Create_Pipeline_Status");
+
+  /**
    * Private Ctor. Never Constructed.
    */
   private SCMEvents() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 77e037a..86ad5ee 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -24,7 +24,8 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
 
 import java.io.IOException;
 import java.util.HashMap;
@@ -39,12 +40,13 @@ public final class PipelineFactory {
   private Map<ReplicationType, PipelineProvider> providers;
 
   PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
-      Configuration conf, GrpcTlsConfig tlsConfig) {
+      Configuration conf, EventPublisher eventPublisher) {
     providers = new HashMap<>();
     providers.put(ReplicationType.STAND_ALONE,
         new SimplePipelineProvider(nodeManager));
     providers.put(ReplicationType.RATIS,
-        new RatisPipelineProvider(nodeManager, stateManager, conf, tlsConfig));
+        new RatisPipelineProvider(nodeManager, stateManager, conf,
+            eventPublisher));
   }
 
   @VisibleForTesting
@@ -63,6 +65,11 @@ public final class PipelineFactory {
     return providers.get(type).create(factor, nodes);
   }
 
+  public void close(ReplicationType type, Pipeline pipeline)
+      throws IOException {
+    providers.get(type).close(pipeline);
+  }
+
   public void shutdown() {
     providers.values().forEach(provider -> provider.shutdown());
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 9ba5f31..e477860 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -21,8 +21,9 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.hadoop.hdds.server.events.EventHandler;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -33,7 +34,8 @@ import java.util.NavigableSet;
 /**
  * Interface which exposes the api for pipeline management.
  */
-public interface PipelineManager extends Closeable, PipelineManagerMXBean {
+public interface PipelineManager extends Closeable, PipelineManagerMXBean,
+    EventHandler<CommandStatusReportHandler.CreatePipelineStatus> {
 
   Pipeline createPipeline(ReplicationType type, ReplicationFactor factor)
       throws IOException;
@@ -51,6 +53,9 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
       ReplicationFactor factor);
 
   List<Pipeline> getPipelines(ReplicationType type,
+      Pipeline.PipelineState state);
+
+  List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor, Pipeline.PipelineState state);
 
   List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
@@ -94,6 +99,4 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
    * @throws IOException in case of any Exception
    */
   void deactivatePipeline(PipelineID pipelineID) throws IOException;
-
-  GrpcTlsConfig getGrpcTlsConfig();
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index a0ce216..c00ff78 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -33,5 +33,7 @@ public interface PipelineProvider {
 
   Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes);
 
+  void close(Pipeline pipeline) throws IOException;
+
   void shutdown();
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
index 2b11da9..6b9a839 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -26,17 +26,18 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineReport;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import org.apache.hadoop.hdds.scm.server
     .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Objects;
 
 /**
  * Handles Pipeline Reports from datanode.
@@ -52,17 +53,14 @@ public class PipelineReportHandler implements
   private final boolean pipelineAvailabilityCheck;
 
   public PipelineReportHandler(SCMSafeModeManager scmSafeModeManager,
-      PipelineManager pipelineManager,
-      Configuration conf) {
+      PipelineManager pipelineManager, Configuration conf) {
     Preconditions.checkNotNull(pipelineManager);
-    Objects.requireNonNull(scmSafeModeManager);
     this.scmSafeModeManager = scmSafeModeManager;
     this.pipelineManager = pipelineManager;
     this.conf = conf;
     this.pipelineAvailabilityCheck = conf.getBoolean(
         HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK,
         HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT);
-
   }
 
   @Override
@@ -77,28 +75,26 @@ public class PipelineReportHandler implements
     LOGGER.trace("Processing pipeline report for dn: {}", dn);
     for (PipelineReport report : pipelineReport.getPipelineReportList()) {
       try {
-        processPipelineReport(report, dn);
+        processPipelineReport(report, dn, publisher);
       } catch (IOException e) {
         LOGGER.error("Could not process pipeline report={} from dn={} {}",
             report, dn, e);
       }
     }
-    if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
-      publisher.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          pipelineReportFromDatanode);
-    }
-
   }
 
-  private void processPipelineReport(PipelineReport report, DatanodeDetails dn)
-      throws IOException {
+  private void processPipelineReport(PipelineReport report, DatanodeDetails dn,
+      EventPublisher publisher) throws IOException {
     PipelineID pipelineID = PipelineID.getFromProtobuf(report.getPipelineID());
     Pipeline pipeline;
     try {
       pipeline = pipelineManager.getPipeline(pipelineID);
     } catch (PipelineNotFoundException e) {
-      RatisPipelineUtils.destroyPipeline(dn, pipelineID, conf,
-          pipelineManager.getGrpcTlsConfig());
+      final ClosePipelineCommand closeCommand =
+          new ClosePipelineCommand(pipelineID);
+      final CommandForDatanode datanodeCommand =
+          new CommandForDatanode<>(dn.getUuid(), closeCommand);
+      publisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
       return;
     }
 
@@ -108,6 +104,10 @@ public class PipelineReportHandler implements
       if (pipeline.isHealthy()) {
         // if all the dns have reported, pipeline can be moved to OPEN state
         pipelineManager.openPipeline(pipelineID);
+      } else {
+        if (pipelineAvailabilityCheck && scmSafeModeManager.getInSafeMode()) {
+          publisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
+        }
       }
     } else {
       // In OPEN state case just report the datanode
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index 7615057..93fbbd1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -131,9 +131,9 @@ class PipelineStateManager {
       throw new IOException("Closed pipeline can not be opened");
     }
     if (pipeline.getPipelineState() == PipelineState.ALLOCATED) {
+      LOG.info("Pipeline {} moved to OPEN state", pipeline.toString());
       pipeline = pipelineStateMap
           .updatePipelineState(pipelineId, PipelineState.OPEN);
-      LOG.info("Pipeline {} moved to OPEN state", pipeline.toString());
     }
     return pipeline;
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 9409728..b9aff86 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -24,37 +24,29 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms
+    .SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
-import org.apache.hadoop.io.MultipleIOException;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.util.TimeDuration;
-import org.apache.ratis.util.function.CheckedBiConsumer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.ForkJoinWorkerThread;
-import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
@@ -69,6 +61,7 @@ public class RatisPipelineProvider implements PipelineProvider {
   private final NodeManager nodeManager;
   private final PipelineStateManager stateManager;
   private final Configuration conf;
+  private final EventPublisher eventPublisher;
 
   // Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
   private final int parallelismForPool = 3;
@@ -83,15 +76,14 @@ public class RatisPipelineProvider implements PipelineProvider {
 
   private final ForkJoinPool forkJoinPool = new ForkJoinPool(
       parallelismForPool, factory, null, false);
-  private final GrpcTlsConfig tlsConfig;
 
   RatisPipelineProvider(NodeManager nodeManager,
       PipelineStateManager stateManager, Configuration conf,
-      GrpcTlsConfig tlsConfig) {
+      EventPublisher eventPublisher) {
     this.nodeManager = nodeManager;
     this.stateManager = stateManager;
     this.conf = conf;
-    this.tlsConfig = tlsConfig;
+    this.eventPublisher = eventPublisher;
   }
 
 
@@ -155,12 +147,25 @@ public class RatisPipelineProvider implements PipelineProvider {
 
     Pipeline pipeline = Pipeline.newBuilder()
         .setId(PipelineID.randomId())
-        .setState(PipelineState.OPEN)
+        .setState(PipelineState.ALLOCATED)
         .setType(ReplicationType.RATIS)
         .setFactor(factor)
         .setNodes(dns)
         .build();
-    initializePipeline(pipeline);
+
+    // Send command to datanode to create pipeline
+    final CreatePipelineCommand createCommand =
+        new CreatePipelineCommand(pipeline.getId(), pipeline.getType(),
+            factor, dns);
+
+    dns.stream().forEach(node -> {
+      final CommandForDatanode datanodeCommand =
+          new CommandForDatanode<>(node.getUuid(), createCommand);
+      LOG.info("Send pipeline:{} create command to datanode {}",
+          pipeline.getId(), datanodeCommand.getDatanodeId());
+      eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+    });
+
     return pipeline;
   }
 
@@ -188,67 +193,24 @@ public class RatisPipelineProvider implements PipelineProvider {
     }
   }
 
-  protected void initializePipeline(Pipeline pipeline) throws IOException {
+  /**
+   * Removes pipeline from SCM. Sends command to destroy pipeline on all
+   * the datanodes.
+   *
+   * @param pipeline        - Pipeline to be destroyed
+   * @throws IOException
+   */
+  public void close(Pipeline pipeline) {
     final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
-    LOG.debug("creating pipeline:{} with {}", pipeline.getId(), group);
-    callRatisRpc(pipeline.getNodes(),
-        (raftClient, peer) -> {
-          RaftClientReply reply = raftClient.groupAdd(group, peer.getId());
-          if (reply == null || !reply.isSuccess()) {
-            String msg = "Pipeline initialization failed for pipeline:"
-                + pipeline.getId() + " node:" + peer.getId();
-            LOG.error(msg);
-            throw new IOException(msg);
-          }
-        });
-  }
-
-  private void callRatisRpc(List<DatanodeDetails> datanodes,
-      CheckedBiConsumer< RaftClient, RaftPeer, IOException> rpc)
-      throws IOException {
-    if (datanodes.isEmpty()) {
-      return;
-    }
-
-    final String rpcType = conf
-        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
-            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
-    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(conf);
-    final List< IOException > exceptions =
-        Collections.synchronizedList(new ArrayList<>());
-    final int maxOutstandingRequests =
-        HddsClientUtils.getMaxOutstandingRequests(conf);
-    final TimeDuration requestTimeout =
-        RatisHelper.getClientRequestTimeout(conf);
-    try {
-      forkJoinPool.submit(() -> {
-        datanodes.parallelStream().forEach(d -> {
-          final RaftPeer p = RatisHelper.toRaftPeer(d);
-          try (RaftClient client = RatisHelper
-              .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
-                  retryPolicy, maxOutstandingRequests, tlsConfig,
-                  requestTimeout)) {
-            rpc.accept(client, p);
-          } catch (IOException ioe) {
-            String errMsg =
-                "Failed invoke Ratis rpc " + rpc + " for " + d.getUuid();
-            LOG.error(errMsg, ioe);
-            exceptions.add(new IOException(errMsg, ioe));
-          }
-        });
-      }).get();
-    } catch (ExecutionException | RejectedExecutionException ex) {
-      LOG.error(ex.getClass().getName() + " exception occurred during " +
-          "createPipeline", ex);
-      throw new IOException(ex.getClass().getName() + " exception occurred " +
-          "during createPipeline", ex);
-    } catch (InterruptedException ex) {
-      Thread.currentThread().interrupt();
-      throw new IOException("Interrupt exception occurred during " +
-          "createPipeline", ex);
-    }
-    if (!exceptions.isEmpty()) {
-      throw MultipleIOException.createIOException(exceptions);
-    }
+    LOG.debug("Destroy pipeline:{} with {} ", pipeline.getId(), group);
+    final ClosePipelineCommand closeCommand =
+        new ClosePipelineCommand(pipeline.getId());
+    pipeline.getNodes().stream().forEach(node -> {
+      final CommandForDatanode datanodeCommand =
+          new CommandForDatanode<>(node.getUuid(), closeCommand);
+      LOG.info("Send pipeline:{} close command to datanode {}",
+          pipeline.getId(), datanodeCommand.getDatanodeId());
+      eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+    });
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
deleted file mode 100644
index 777a0b0..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.ratis.RatisHelper;
-import org.apache.ratis.client.RaftClient;
-import org.apache.ratis.grpc.GrpcTlsConfig;
-import org.apache.ratis.protocol.RaftGroup;
-import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.retry.RetryPolicy;
-import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.util.TimeDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Utility class for Ratis pipelines. Contains methods to create and destroy
- * ratis pipelines.
- */
-public final class RatisPipelineUtils {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(RatisPipelineUtils.class);
-
-  private RatisPipelineUtils() {
-  }
-  /**
-   * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
-   * the datanodes.
-   *
-   * @param pipeline        - Pipeline to be destroyed
-   * @param ozoneConf       - Ozone configuration
-   * @param grpcTlsConfig
-   * @throws IOException
-   */
-  static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf,
-      GrpcTlsConfig grpcTlsConfig) {
-    final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
-    LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
-    for (DatanodeDetails dn : pipeline.getNodes()) {
-      try {
-        destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
-      } catch (IOException e) {
-        LOG.warn("Pipeline destroy failed for pipeline={} dn={}",
-            pipeline.getId(), dn);
-      }
-    }
-  }
-
-  /**
-   * Sends ratis command to destroy pipeline on the given datanode.
-   *
-   * @param dn         - Datanode on which pipeline needs to be destroyed
-   * @param pipelineID - ID of pipeline to be destroyed
-   * @param ozoneConf  - Ozone configuration
-   * @param grpcTlsConfig - grpc tls configuration
-   * @throws IOException
-   */
-  static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
-      Configuration ozoneConf, GrpcTlsConfig grpcTlsConfig) throws IOException {
-    final String rpcType = ozoneConf
-        .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
-            ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
-    final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
-    final RaftPeer p = RatisHelper.toRaftPeer(dn);
-    final int maxOutstandingRequests =
-        HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
-    final TimeDuration requestTimeout =
-        RatisHelper.getClientRequestTimeout(ozoneConf);
-    try(RaftClient client = RatisHelper
-        .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
-            retryPolicy, maxOutstandingRequests, grpcTlsConfig,
-            requestTimeout)) {
-      client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()),
-          true, p.getId());
-    }
-  }
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 0964f6d..405baa8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -21,23 +21,30 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.CommandStatus;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
+    .CreatePipelineStatus;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
 import org.apache.hadoop.hdds.utils.MetadataStore;
 import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
 import org.apache.hadoop.hdds.utils.Scheduler;
-import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -81,18 +88,20 @@ public class SCMPipelineManager implements PipelineManager {
   private final NodeManager nodeManager;
   private final SCMPipelineMetrics metrics;
   private final Configuration conf;
+  private final StorageContainerManager scm;
+  private boolean pipelineAvailabilityCheck;
+  private boolean createPipelineInSafemode;
   // Pipeline Manager MXBean
   private ObjectName pmInfoBean;
-  private GrpcTlsConfig grpcTlsConfig;
 
   public SCMPipelineManager(Configuration conf, NodeManager nodeManager,
-      EventPublisher eventPublisher, GrpcTlsConfig grpcTlsConfig)
+      EventPublisher eventPublisher, StorageContainerManager scm)
       throws IOException {
     this.lock = new ReentrantReadWriteLock();
     this.conf = conf;
     this.stateManager = new PipelineStateManager(conf);
     this.pipelineFactory = new PipelineFactory(nodeManager, stateManager,
-        conf, grpcTlsConfig);
+        conf, eventPublisher);
     // TODO: See if thread priority needs to be set for these threads
     scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
     this.backgroundPipelineCreator =
@@ -113,8 +122,14 @@ public class SCMPipelineManager implements PipelineManager {
     this.metrics = SCMPipelineMetrics.create();
     this.pmInfoBean = MBeans.register("SCMPipelineManager",
         "SCMPipelineManagerInfo", this);
+    this.scm = scm;
+    this.pipelineAvailabilityCheck = conf.getBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK,
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK_DEFAULT);
+    this.createPipelineInSafemode = conf.getBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
     initializePipelineState();
-    this.grpcTlsConfig = grpcTlsConfig;
   }
 
   public PipelineStateManager getStateManager() {
@@ -130,6 +145,9 @@ public class SCMPipelineManager implements PipelineManager {
   private void initializePipelineState() throws IOException {
     if (pipelineStore.isEmpty()) {
       LOG.info("No pipeline exists in current db");
+      if (pipelineAvailabilityCheck && createPipelineInSafemode) {
+        startPipelineCreator();
+      }
       return;
     }
     List<Map.Entry<byte[], byte[]>> pipelines =
@@ -148,8 +166,8 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   @Override
-  public synchronized Pipeline createPipeline(
-      ReplicationType type, ReplicationFactor factor) throws IOException {
+  public synchronized Pipeline createPipeline(ReplicationType type,
+      ReplicationFactor factor) throws IOException {
     lock.writeLock().lock();
     try {
       Pipeline pipeline = pipelineFactory.create(type, factor);
@@ -157,8 +175,11 @@ public class SCMPipelineManager implements PipelineManager {
           pipeline.getProtobufMessage().toByteArray());
       stateManager.addPipeline(pipeline);
       nodeManager.addPipeline(pipeline);
-      metrics.incNumPipelineCreated();
-      metrics.createPerPipelineMetrics(pipeline);
+      metrics.incNumPipelineAllocated();
+      if (pipeline.isOpen()) {
+        metrics.incNumPipelineCreated();
+        metrics.createPerPipelineMetrics(pipeline);
+      }
       return pipeline;
     } catch (InsufficientDatanodesException idEx) {
       throw idEx;
@@ -225,6 +246,16 @@ public class SCMPipelineManager implements PipelineManager {
     }
   }
 
+  public List<Pipeline> getPipelines(ReplicationType type,
+      Pipeline.PipelineState state) {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipelines(type, state);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
   @Override
   public List<Pipeline> getPipelines(ReplicationType type,
       ReplicationFactor factor, Pipeline.PipelineState state) {
@@ -293,7 +324,9 @@ public class SCMPipelineManager implements PipelineManager {
     lock.writeLock().lock();
     try {
       Pipeline pipeline = stateManager.openPipeline(pipelineId);
-      metrics.createPerPipelineMetrics(pipeline);
+      if (pipelineAvailabilityCheck && scm != null && scm.isInSafeMode()) {
+        eventPublisher.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
+      }
     } finally {
       lock.writeLock().unlock();
     }
@@ -408,7 +441,7 @@ public class SCMPipelineManager implements PipelineManager {
    * @throws IOException
    */
   private void destroyPipeline(Pipeline pipeline) throws IOException {
-    RatisPipelineUtils.destroyPipeline(pipeline, conf, grpcTlsConfig);
+    pipelineFactory.close(pipeline.getType(), pipeline);
     // remove the pipeline from the pipeline manager
     removePipeline(pipeline.getId());
     triggerPipelineCreation();
@@ -441,11 +474,6 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   @Override
-  public GrpcTlsConfig getGrpcTlsConfig() {
-    return grpcTlsConfig;
-  }
-
-  @Override
   public void close() throws IOException {
     if (scheduler != null) {
       scheduler.close();
@@ -466,4 +494,63 @@ public class SCMPipelineManager implements PipelineManager {
     // shutdown pipeline provider.
     pipelineFactory.shutdown();
   }
+
+  @Override
+  public void onMessage(CreatePipelineStatus response,
+      EventPublisher publisher) {
+    CommandStatus result = response.getCmdStatus();
+    CreatePipelineACKProto ack = result.getCreatePipelineAck();
+    PipelineID pipelineID = PipelineID.getFromProtobuf(ack.getPipelineID());
+    CommandStatus.Status status = result.getStatus();
+    LOG.info("receive pipeline {} create status {}", pipelineID, status);
+    Pipeline pipeline;
+    try {
+      pipeline = stateManager.getPipeline(pipelineID);
+    } catch (PipelineNotFoundException e) {
+      LOG.warn("Pipeline {} cannot be found", pipelineID);
+      return;
+    }
+
+    switch (status) {
+    case EXECUTED:
+      DatanodeDetails dn = nodeManager.getNodeByUuid(ack.getDatanodeUUID());
+      if (dn == null) {
+        LOG.warn("Datanode {} for Pipeline {} cannot be found",
+            ack.getDatanodeUUID(), pipelineID);
+        return;
+      }
+      try {
+        pipeline.reportDatanode(dn);
+      } catch (IOException e) {
+        LOG.warn("Update {} for Pipeline {} failed for {}",
+            dn.getUuidString(), pipelineID, e.getMessage());
+        return;
+      }
+      // If all datanodes are updated, we believe pipeline is ready to OPEN
+      if (pipeline.isHealthy() ||
+          pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
+        try {
+          openPipeline(pipelineID);
+        } catch (IOException e) {
+          LOG.warn("Fail to open Pipeline {} for {}", pipelineID,
+              e.getMessage());
+          return;
+        }
+        metrics.incNumPipelineCreated();
+        metrics.createPerPipelineMetrics(pipeline);
+      }
+      return;
+    case FAILED:
+      metrics.incNumPipelineCreationFailed();
+      try {
+        finalizeAndDestroyPipeline(pipeline, false);
+      } catch (IOException e) {
+        LOG.warn("Fail to close Pipeline {} for {}", pipelineID,
+            e.getMessage());
+      }
+      return;
+    default:
+      LOG.error("Unknown or unexpected status {}", status);
+    }
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index d0f7f6e..fa91572 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -46,6 +46,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
 
   private MetricsRegistry registry;
 
+  private @Metric MutableCounterLong numPipelineAllocated;
   private @Metric MutableCounterLong numPipelineCreated;
   private @Metric MutableCounterLong numPipelineCreationFailed;
   private @Metric MutableCounterLong numPipelineDestroyed;
@@ -83,6 +84,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
   @SuppressWarnings("SuspiciousMethodCalls")
   public void getMetrics(MetricsCollector collector, boolean all) {
     MetricsRecordBuilder recordBuilder = collector.addRecord(SOURCE_NAME);
+    numPipelineAllocated.snapshot(recordBuilder, true);
     numPipelineCreated.snapshot(recordBuilder, true);
     numPipelineCreationFailed.snapshot(recordBuilder, true);
     numPipelineDestroyed.snapshot(recordBuilder, true);
@@ -94,6 +96,7 @@ public final class SCMPipelineMetrics implements MetricsSource {
   }
 
   void createPerPipelineMetrics(Pipeline pipeline) {
+    System.out.println("add pipeline " +  pipeline.getId() + " to metrics map");
     numBlocksAllocated.put(pipeline.getId(), new MutableCounterLong(Interns
         .info(getBlockAllocationMetricName(pipeline),
             "Number of blocks allocated in pipeline " + pipeline.getId()), 0L));
@@ -117,6 +120,14 @@ public final class SCMPipelineMetrics implements MetricsSource {
   }
 
   /**
+   * Increments number of pipeline allocation count, including succeeded
+   * and failed.
+   */
+  void incNumPipelineAllocated() {
+    numPipelineAllocated.incr();
+  }
+
+  /**
    * Increments number of successful pipeline creation count.
    */
   void incNumPipelineCreated() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index 54e2141..a772a97 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -74,6 +74,11 @@ public class SimplePipelineProvider implements PipelineProvider {
   }
 
   @Override
+  public void close(Pipeline pipeline) throws IOException {
+
+  }
+
+  @Override
   public void shutdown() {
     // Do nothing.
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 7a00d76..5304270 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -20,17 +20,10 @@ package org.apache.hadoop.hdds.scm.safemode;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
-
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -38,9 +31,6 @@ import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.HashSet;
-import java.util.Set;
-
 /**
  * Class defining Safe mode exit criteria for Pipelines.
  *
@@ -49,43 +39,52 @@ import java.util.Set;
  * through in a cluster.
  */
 public class HealthyPipelineSafeModeRule
-    extends SafeModeExitRule<PipelineReportFromDatanode>{
+    extends SafeModeExitRule<Pipeline>{
 
   public static final Logger LOG =
       LoggerFactory.getLogger(HealthyPipelineSafeModeRule.class);
-  private final PipelineManager pipelineManager;
   private final int healthyPipelineThresholdCount;
   private int currentHealthyPipelineCount = 0;
-  private final Set<DatanodeDetails> processedDatanodeDetails =
-      new HashSet<>();
 
   HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
       PipelineManager pipelineManager,
       SCMSafeModeManager manager, Configuration configuration) {
     super(manager, ruleName, eventQueue);
-    this.pipelineManager = pipelineManager;
     double healthyPipelinesPercent =
         configuration.getDouble(HddsConfigKeys.
                 HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT,
             HddsConfigKeys.
                 HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT_DEFAULT);
 
+    int minHealthyPipelines = 0;
+
+    boolean createPipelineInSafemode = configuration.getBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
+
+    if (createPipelineInSafemode) {
+      minHealthyPipelines =
+          configuration.getInt(HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE,
+              HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_PIPELINE_DEFAULT);
+    }
+
     Preconditions.checkArgument(
         (healthyPipelinesPercent >= 0.0 && healthyPipelinesPercent <= 1.0),
         HddsConfigKeys.
             HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT
             + " value should be >= 0.0 and <= 1.0");
 
-    // As we want to wait for 3 node pipelines
-    int pipelineCount =
+    // As we want to wait for RATIS write pipelines, no matter ONE or THREE
+    int pipelineCount = pipelineManager.getPipelines(
+        HddsProtos.ReplicationType.RATIS, Pipeline.PipelineState.OPEN).size() +
         pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE).size();
+            Pipeline.PipelineState.ALLOCATED).size();
 
     // This value will be zero when pipeline count is 0.
     // On a fresh installed cluster, there will be zero pipelines in the SCM
     // pipeline DB.
-    healthyPipelineThresholdCount =
-        (int) Math.ceil(healthyPipelinesPercent * pipelineCount);
+    healthyPipelineThresholdCount = Math.max(minHealthyPipelines,
+        (int) Math.ceil(healthyPipelinesPercent * pipelineCount));
 
     LOG.info(" Total pipeline count is {}, healthy pipeline " +
         "threshold count is {}", pipelineCount, healthyPipelineThresholdCount);
@@ -95,67 +94,41 @@ public class HealthyPipelineSafeModeRule
   }
 
   @Override
-  protected TypedEvent<PipelineReportFromDatanode> getEventType() {
-    return SCMEvents.PROCESSED_PIPELINE_REPORT;
+  protected TypedEvent<Pipeline> getEventType() {
+    return SCMEvents.OPEN_PIPELINE;
   }
 
   @Override
   protected boolean validate() {
     if (currentHealthyPipelineCount >= healthyPipelineThresholdCount) {
+      LOG.info("{} rule satisfied", this.getClass().getSimpleName());
       return true;
     }
     return false;
   }
 
   @Override
-  protected void process(PipelineReportFromDatanode
-      pipelineReportFromDatanode) {
+  protected void process(Pipeline pipeline) {
 
     // When SCM is in safe mode for long time, already registered
-    // datanode can send pipeline report again, then pipeline handler fires
-    // processed report event, we should not consider this pipeline report
-    // from datanode again during threshold calculation.
-    Preconditions.checkNotNull(pipelineReportFromDatanode);
-    DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails();
-    if (!processedDatanodeDetails.contains(
-        pipelineReportFromDatanode.getDatanodeDetails())) {
-
-      Pipeline pipeline;
-      PipelineReportsProto pipelineReport =
-          pipelineReportFromDatanode.getReport();
-
-      for (PipelineReport report : pipelineReport.getPipelineReportList()) {
-        PipelineID pipelineID = PipelineID
-            .getFromProtobuf(report.getPipelineID());
-        try {
-          pipeline = pipelineManager.getPipeline(pipelineID);
-        } catch (PipelineNotFoundException e) {
-          continue;
-        }
-
-        if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
-            pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
-          // If the pipeline is open state mean, all 3 datanodes are reported
-          // for this pipeline.
-          currentHealthyPipelineCount++;
-          getSafeModeMetrics().incCurrentHealthyPipelinesCount();
-        }
-      }
-      if (scmInSafeMode()) {
-        SCMSafeModeManager.getLogger().info(
-            "SCM in safe mode. Healthy pipelines reported count is {}, " +
-                "required healthy pipeline reported count is {}",
-            currentHealthyPipelineCount, healthyPipelineThresholdCount);
-      }
-
-      processedDatanodeDetails.add(dnDetails);
+    // datanode can send pipeline report again, or SCMPipelineManager will
+    // create new pipelines.
+    Preconditions.checkNotNull(pipeline);
+    if (pipeline.getType() == HddsProtos.ReplicationType.RATIS) {
+      currentHealthyPipelineCount++;
+      getSafeModeMetrics().incCurrentHealthyPipelinesCount();
     }
 
+    if (scmInSafeMode()) {
+      SCMSafeModeManager.getLogger().info(
+          "SCM in safe mode. Healthy pipelines reported count is {}, " +
+              "required healthy pipeline reported count is {}",
+          currentHealthyPipelineCount, healthyPipelineThresholdCount);
+    }
   }
 
   @Override
   protected void cleanup() {
-    processedDatanodeDetails.clear();
   }
 
   @VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
index 841d8ff..34ba35c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/OneReplicaPipelineSafeModeRule.java
@@ -22,17 +22,10 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.
-    PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdds.server.events.TypedEvent;
 import org.slf4j.Logger;
@@ -47,14 +40,13 @@ import java.util.Set;
  * replica available for read when we exit safe mode.
  */
 public class OneReplicaPipelineSafeModeRule extends
-    SafeModeExitRule<PipelineReportFromDatanode> {
+    SafeModeExitRule<Pipeline> {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(OneReplicaPipelineSafeModeRule.class);
 
   private int thresholdCount;
   private Set<PipelineID> reportedPipelineIDSet = new HashSet<>();
-  private final PipelineManager pipelineManager;
   private int currentReportedPipelineCount = 0;
 
 
@@ -62,7 +54,6 @@ public class OneReplicaPipelineSafeModeRule extends
       PipelineManager pipelineManager,
       SCMSafeModeManager safeModeManager, Configuration configuration) {
     super(safeModeManager, ruleName, eventQueue);
-    this.pipelineManager = pipelineManager;
 
     double percent =
         configuration.getDouble(
@@ -75,69 +66,59 @@ public class OneReplicaPipelineSafeModeRule extends
             HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT  +
             " value should be >= 0.0 and <= 1.0");
 
+    // Exclude CLOSED pipeline
     int totalPipelineCount =
         pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
-            HddsProtos.ReplicationFactor.THREE).size();
+            HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
+            .size() +
+            pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS,
+                HddsProtos.ReplicationFactor.THREE,
+                Pipeline.PipelineState.ALLOCATED).size();
 
     thresholdCount = (int) Math.ceil(percent * totalPipelineCount);
 
-    LOG.info(" Total pipeline count is {}, pipeline's with atleast one " +
+    LOG.info("Total pipeline count is {}, pipeline's with at least one " +
         "datanode reported threshold count is {}", totalPipelineCount,
         thresholdCount);
 
     getSafeModeMetrics().setNumPipelinesWithAtleastOneReplicaReportedThreshold(
         thresholdCount);
-
   }
 
   @Override
-  protected TypedEvent<PipelineReportFromDatanode> getEventType() {
-    return SCMEvents.PROCESSED_PIPELINE_REPORT;
+  protected TypedEvent<Pipeline> getEventType() {
+    return SCMEvents.OPEN_PIPELINE;
   }
 
   @Override
   protected boolean validate() {
     if (currentReportedPipelineCount >= thresholdCount) {
+      LOG.info("{} rule satisfied", this.getClass().getSimpleName());
       return true;
     }
     return false;
   }
 
   @Override
-  protected void process(PipelineReportFromDatanode
-      pipelineReportFromDatanode) {
-    Pipeline pipeline;
-    Preconditions.checkNotNull(pipelineReportFromDatanode);
-    PipelineReportsProto pipelineReport =
-        pipelineReportFromDatanode.getReport();
-
-    for (PipelineReport report : pipelineReport.getPipelineReportList()) {
-      PipelineID pipelineID = PipelineID
-          .getFromProtobuf(report.getPipelineID());
-      try {
-        pipeline = pipelineManager.getPipeline(pipelineID);
-      } catch (PipelineNotFoundException e) {
-        continue;
-      }
-
-      if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
-          !reportedPipelineIDSet.contains(pipelineID)) {
-        reportedPipelineIDSet.add(pipelineID);
-        getSafeModeMetrics()
-            .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount();
-      }
+  protected void process(Pipeline pipeline) {
+    Preconditions.checkNotNull(pipeline);
+    if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
+        pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
+        !reportedPipelineIDSet.contains(pipeline.getId())) {
+      reportedPipelineIDSet.add(pipeline.getId());
+      getSafeModeMetrics()
+          .incCurrentHealthyPipelinesWithAtleastOneReplicaReportedCount();
     }
 
     currentReportedPipelineCount = reportedPipelineIDSet.size();
 
     if (scmInSafeMode()) {
       SCMSafeModeManager.getLogger().info(
-          "SCM in safe mode. Pipelines with atleast one datanode reported " +
-              "count is {}, required atleast one datanode reported per " +
+          "SCM in safe mode. Pipelines with at least one datanode reported " +
+              "count is {}, required at least one datanode reported per " +
               "pipeline count is {}",
           currentReportedPipelineCount, thresholdCount);
     }
-
   }
 
   @Override
@@ -154,5 +135,4 @@ public class OneReplicaPipelineSafeModeRule extends
   public int getCurrentReportedPipelineCount() {
     return currentReportedPipelineCount;
   }
-
-}
+}
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
index a22d162..1bad4cb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
@@ -59,17 +59,17 @@ import org.slf4j.LoggerFactory;
  * number of datanode registered is met or not.
  *
  * 3. HealthyPipelineSafeModeRule:
- * Once the pipelineReportHandler processes the
- * {@link SCMEvents#PIPELINE_REPORT}, it fires
- * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * Once the SCMPipelineManager processes the
+ * {@link SCMEvents#CREATE_PIPELINE_STATUS}, it fires
+ * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this
  * event. This rule processes this report, and check if pipeline is healthy
  * and increments current healthy pipeline count. Then validate it cutoff
  * threshold for healthy pipeline is met or not.
  *
  * 4. OneReplicaPipelineSafeModeRule:
- * Once the pipelineReportHandler processes the
- * {@link SCMEvents#PIPELINE_REPORT}, it fires
- * {@link SCMEvents#PROCESSED_PIPELINE_REPORT}. This rule handles this
+ * Once the SCMPipelineManager processes the
+ * {@link SCMEvents#CREATE_PIPELINE_STATUS}, it fires
+ * {@link SCMEvents#OPEN_PIPELINE}. This rule handles this
  * event. This rule processes this report, and add the reported pipeline to
  * reported pipeline set. Then validate it cutoff threshold for one replica
  * per pipeline is met or not.
@@ -166,6 +166,7 @@ public class SCMSafeModeManager {
 
     if (exitRules.get(ruleName) != null) {
       validatedRules.add(ruleName);
+      LOG.info("{} rule is successfully validated", ruleName);
     } else {
       // This should never happen
       LOG.error("No Such Exit rule {}", ruleName);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
index b9e5333..6128266 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SafeModeHandler.java
@@ -129,7 +129,8 @@ public class SafeModeHandler implements EventHandler<SafeModeStatus> {
     List<Pipeline> pipelineList = scmPipelineManager.getPipelines();
     pipelineList.forEach((pipeline) -> {
       try {
-        if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
+        if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
+            pipeline.isAllocationTimeout()) {
           scmPipelineManager.finalizeAndDestroyPipeline(pipeline, false);
         }
       } catch (IOException ex) {
@@ -142,6 +143,4 @@ public class SafeModeHandler implements EventHandler<SafeModeStatus> {
   public boolean getSafeModeStatus() {
     return isInSafeMode.get();
   }
-
-
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index 9f6077b..3dbb4cb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -164,6 +164,7 @@ public final class SCMDatanodeHeartbeatDispatcher {
       }
 
       if (heartbeat.getCommandStatusReportsCount() != 0) {
+        LOG.debug("Dispatching Command Status Report.");
         for (CommandStatusReportsProto commandStatusReport : heartbeat
             .getCommandStatusReportsList()) {
           eventPublisher.fireEvent(
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 6dd9dab..11c1a44 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -57,6 +57,12 @@ import static org.apache.hadoop.hdds.protocol.proto
     .Type.closeContainerCommand;
 import static org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto
+    .Type.closePipelineCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
+    .Type.createPipelineCommand;
+import static org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto
     .Type.deleteBlocksCommand;
 import static org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto
@@ -90,6 +96,8 @@ import org.apache.hadoop.ozone.audit.Auditor;
 import org.apache.hadoop.ozone.audit.SCMAction;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
+import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@@ -350,6 +358,18 @@ public class SCMDatanodeProtocolServer implements
           .setReplicateContainerCommandProto(
               ((ReplicateContainerCommand)cmd).getProto())
           .build();
+    case createPipelineCommand:
+      return builder
+          .setCommandType(createPipelineCommand)
+          .setCreatePipelineCommandProto(
+              ((CreatePipelineCommand)cmd).getProto())
+          .build();
+    case closePipelineCommand:
+      return builder
+          .setCommandType(closePipelineCommand)
+          .setClosePipelineCommandProto(
+              ((ClosePipelineCommand)cmd).getProto())
+          .build();
     default:
       throw new IllegalArgumentException("Scm command " +
           cmd.getType().toString() + " is not implemented");
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index c25b3a0..7c2cffa 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -356,6 +356,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
     eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
     eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS, safeModeHandler);
+    eventQueue.addHandler(SCMEvents.CREATE_PIPELINE_STATUS, pipelineManager);
     registerMXBean();
     registerMetricsSource(this);
   }
@@ -402,8 +403,7 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
       pipelineManager = configurator.getPipelineManager();
     } else {
       pipelineManager =
-          new SCMPipelineManager(conf, scmNodeManager, eventQueue,
-              grpcTlsConfig);
+          new SCMPipelineManager(conf, scmNodeManager, eventQueue, this);
     }
 
     if (configurator.getContainerManager() != null) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
index 37321d7..46e8a51 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java
@@ -33,7 +33,9 @@ import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
 import org.apache.hadoop.hdds.scm.server
     .SCMDatanodeHeartbeatDispatcher.PipelineActionsFromDatanode;
@@ -41,6 +43,12 @@ import org.apache.hadoop.hdds.scm.server
     .SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
+    .CreatePipelineStatus;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.CreatePipelineACKProto;
+import org.apache.hadoop.hdds.protocol.proto.
+    StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
 
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
@@ -360,6 +368,32 @@ public final class TestUtils {
     return new PipelineReportFromDatanode(dn, reportBuilder.build());
   }
 
+  public static CreatePipelineStatus getPipelineCreateStatusFromDatanode(
+      DatanodeDetails dn, PipelineID pipelineID) {
+    CreatePipelineACKProto ack = CreatePipelineACKProto.newBuilder()
+        .setPipelineID(pipelineID.getProtobuf())
+        .setDatanodeUUID(dn.getUuidString())
+        .build();
+    CommandStatus status = CommandStatus.newBuilder()
+        .setCreatePipelineAck(ack)
+        .setStatus(CommandStatus.Status.EXECUTED)
+        .setCmdId(0L)
+        .setType(Type.createPipelineCommand)
+        .build();
+    return new CreatePipelineStatus(status);
+  }
+
+  public static void openAllRatisPipelines(PipelineManager pipelineManager)
+      throws IOException {
+    // Pipeline is created by background thread
+    List<Pipeline> pipelines =
+        pipelineManager.getPipelines(HddsProtos.ReplicationType.RATIS);
+    // Trigger the processed pipeline report event
+    for (Pipeline pipeline : pipelines) {
+      pipelineManager.openPipeline(pipeline.getId());
+    }
+  }
+
   public static PipelineActionsFromDatanode getPipelineActionFromDatanode(
       DatanodeDetails dn, PipelineID... pipelineIDs) {
     PipelineActionsProto.Builder actionsProtoBuilder =
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index a8364a4..ba3d4f4 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
@@ -80,6 +81,8 @@ public class TestCloseContainerEventHandler {
     eventQueue.addHandler(CLOSE_CONTAINER,
         new CloseContainerEventHandler(pipelineManager, containerManager));
     eventQueue.addHandler(DATANODE_COMMAND, nodeManager);
+    // Move all pipelines created by background from ALLOCATED to OPEN state
+    TestUtils.openAllRatisPipelines(pipelineManager);
   }
 
   @AfterClass
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 7657b54..14c24e0 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -73,6 +73,7 @@ public class TestDeadNodeHandler {
   private SCMNodeManager nodeManager;
   private ContainerManager containerManager;
   private NodeReportHandler nodeReportHandler;
+  private SCMPipelineManager pipelineManager;
   private DeadNodeHandler deadNodeHandler;
   private EventPublisher publisher;
   private EventQueue eventQueue;
@@ -87,12 +88,12 @@ public class TestDeadNodeHandler {
     eventQueue = new EventQueue();
     scm = HddsTestUtils.getScm(conf);
     nodeManager = (SCMNodeManager) scm.getScmNodeManager();
-    SCMPipelineManager manager =
+    pipelineManager =
         (SCMPipelineManager)scm.getPipelineManager();
     PipelineProvider mockRatisProvider =
-        new MockRatisPipelineProvider(nodeManager, manager.getStateManager(),
-            conf);
-    manager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
+        new MockRatisPipelineProvider(nodeManager,
+            pipelineManager.getStateManager(), conf);
+    pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
     containerManager = scm.getContainerManager();
     deadNodeHandler = new DeadNodeHandler(nodeManager,
@@ -147,6 +148,8 @@ public class TestDeadNodeHandler {
     nodeManager.register(TestUtils.randomDatanodeDetails(),
         TestUtils.createNodeReport(storageOne), null);
 
+    TestUtils.openAllRatisPipelines(pipelineManager);
+
     ContainerInfo container1 =
         TestUtils.allocateContainer(containerManager);
     ContainerInfo container2 =
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index d028851..dd61102 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -121,6 +121,7 @@ public class TestSCMNodeManager {
         testDir.getAbsolutePath());
     conf.setTimeDuration(OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL, 100,
         TimeUnit.MILLISECONDS);
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     return conf;
   }
 
@@ -1035,9 +1036,11 @@ public class TestSCMNodeManager {
       eq.processAll(1000L);
       List<SCMCommand> command =
           nodemanager.processHeartbeat(datanodeDetails);
-      Assert.assertEquals(1, command.size());
-      Assert
-          .assertEquals(command.get(0).getClass(), CloseContainerCommand.class);
+      // With dh registered, SCM will send create pipeline command to dn
+      Assert.assertTrue(command.size() >= 1);
+      Assert.assertTrue(command.get(0).getClass().equals(
+          CloseContainerCommand.class) ||
+          command.get(1).getClass().equals(CloseContainerCommand.class));
     } catch (IOException e) {
       e.printStackTrace();
       throw  e;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 01c53ba..28a3484 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.server.events.EventQueue;
 
 import java.io.IOException;
 
@@ -31,7 +32,7 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
   public MockRatisPipelineProvider(NodeManager nodeManager,
                             PipelineStateManager stateManager,
                             Configuration conf) {
-    super(nodeManager, stateManager, conf, null);
+    super(nodeManager, stateManager, conf, new EventQueue());
   }
 
   protected void initializePipeline(Pipeline pipeline) throws IOException {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index 94c3039..639fc9a 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -22,10 +22,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.
-    StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -34,7 +30,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -68,7 +63,8 @@ public class TestHealthyPipelineSafeModeRule {
       // enable pipeline check
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+      config.setBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
           nodeManager, eventQueue, null);
@@ -88,10 +84,8 @@ public class TestHealthyPipelineSafeModeRule {
     } finally {
       FileUtil.fullyDelete(new File(storageDir));
     }
-
   }
 
-
   @Test
   public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception {
 
@@ -113,7 +107,8 @@ public class TestHealthyPipelineSafeModeRule {
       // enable pipeline check
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+      config.setBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
           nodeManager, eventQueue, null);
@@ -162,7 +157,6 @@ public class TestHealthyPipelineSafeModeRule {
     } finally {
       FileUtil.fullyDelete(new File(storageDir));
     }
-
   }
 
 
@@ -188,7 +182,8 @@ public class TestHealthyPipelineSafeModeRule {
       // enable pipeline check
       config.setBoolean(
           HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
-
+      config.setBoolean(
+          HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
       SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
           nodeManager, eventQueue, null);
@@ -217,7 +212,7 @@ public class TestHealthyPipelineSafeModeRule {
           scmSafeModeManager.getHealthyPipelineSafeModeRule();
 
 
-      // No datanodes have sent pipelinereport from datanode
+      // No pipeline event have sent to SCMSafemodeManager
       Assert.assertFalse(healthyPipelineSafeModeRule.validate());
 
 
@@ -225,14 +220,14 @@ public class TestHealthyPipelineSafeModeRule {
           GenericTestUtils.LogCapturer.captureLogs(LoggerFactory.getLogger(
               SCMSafeModeManager.class));
 
-      // fire event with pipeline report with ratis type and factor 1
+      // fire event with pipeline create status with ratis type and factor 1
       // pipeline, validate() should return false
       firePipelineEvent(pipeline1, eventQueue);
 
       GenericTestUtils.waitFor(() -> logCapturer.getOutput().contains(
-          "reported count is 0"),
+          "reported count is 1"),
           1000, 5000);
-      Assert.assertFalse(healthyPipelineSafeModeRule.validate());
+      Assert.assertTrue(healthyPipelineSafeModeRule.validate());
 
       firePipelineEvent(pipeline2, eventQueue);
       firePipelineEvent(pipeline3, eventQueue);
@@ -246,19 +241,7 @@ public class TestHealthyPipelineSafeModeRule {
 
   }
 
-
   private void firePipelineEvent(Pipeline pipeline, EventQueue eventQueue) {
-    PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
-        .newBuilder();
-
-    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-        .setPipelineID(pipeline.getId().getProtobuf()));
-
-    // Here no need to fire event from 3 nodes, as already pipeline is in
-    // open state, but doing it.
-    eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-        new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-            pipeline.getNodes().get(0), reportBuilder.build()));
+    eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
   }
-
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index ca54d05..7f8f0db 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@ -20,10 +20,6 @@ package org.apache.hadoop.hdds.scm.safemode;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -32,7 +28,6 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -63,6 +58,8 @@ public class TestOneReplicaPipelineSafeModeRule {
         HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
     ozoneConfiguration.set(HddsConfigKeys.OZONE_METADATA_DIRS,
         folder.newFolder().toString());
+    ozoneConfiguration.setBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
 
     List<ContainerInfo> containers = new ArrayList<>();
     containers.addAll(HddsTestUtils.getContainerInfo(1));
@@ -123,7 +120,6 @@ public class TestOneReplicaPipelineSafeModeRule {
     firePipelineEvent(pipelines.get(pipelineFactorThreeCount - 1));
 
     GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
-
   }
 
 
@@ -170,11 +166,8 @@ public class TestOneReplicaPipelineSafeModeRule {
     firePipelineEvent(pipelines.get(pipelineCountThree - 1));
 
     GenericTestUtils.waitFor(() -> rule.validate(), 1000, 5000);
-
   }
 
-
-
   private void createPipelines(int count,
       HddsProtos.ReplicationFactor factor) throws Exception {
     for (int i = 0; i < count; i++) {
@@ -184,26 +177,6 @@ public class TestOneReplicaPipelineSafeModeRule {
   }
 
   private void firePipelineEvent(Pipeline pipeline) {
-    PipelineReportsProto.Builder reportBuilder =
-        PipelineReportsProto.newBuilder();
-
-    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-        .setPipelineID(pipeline.getId().getProtobuf()));
-
-    if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) {
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(0), reportBuilder.build()));
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(1), reportBuilder.build()));
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(2), reportBuilder.build()));
-    } else {
-      eventQueue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode(
-              pipeline.getNodes().get(0), reportBuilder.build()));
-    }
+    eventQueue.fireEvent(SCMEvents.OPEN_PIPELINE, pipeline);
   }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index ba92035..0de0a73 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -31,8 +31,6 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
 import org.apache.hadoop.hdds.scm.HddsTestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -42,7 +40,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.Assert;
@@ -175,7 +172,7 @@ public class TestSCMSafeModeManager {
         HDDS_SCM_SAFEMODE_HEALTHY_PIPELINE_THRESHOLD_PCT, healthyPercent);
     conf.setDouble(HddsConfigKeys.
         HDDS_SCM_SAFEMODE_ONE_NODE_REPORTED_PIPELINE_PCT, oneReplicaPercent);
-
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
     return conf;
   }
 
@@ -300,12 +297,12 @@ public class TestSCMSafeModeManager {
     // we shall a get an event when datanode is registered. In that case,
     // validate will return true, and add this to validatedRules.
     if (Math.max(healthyPipelinePercent, oneReplicaThresholdCount) == 0) {
-      firePipelineEvent(pipelines.get(0));
+      firePipelineEvent(pipelineManager, pipelines.get(0));
     }
 
     for (int i = 0; i < Math.max(healthyPipelineThresholdCount,
-        oneReplicaThresholdCount); i++) {
-      firePipelineEvent(pipelines.get(i));
+        Math.min(oneReplicaThresholdCount, pipelines.size())); i++) {
+      firePipelineEvent(pipelineManager, pipelines.get(i));
 
       if (i < healthyPipelineThresholdCount) {
         checkHealthy(i + 1);
@@ -350,15 +347,11 @@ public class TestSCMSafeModeManager {
         1000,  5000);
   }
 
-  private void firePipelineEvent(Pipeline pipeline) throws Exception {
-    PipelineReportsProto.Builder reportBuilder =
-        PipelineReportsProto.newBuilder();
-
-    reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-        .setPipelineID(pipeline.getId().getProtobuf()));
-    queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-        new PipelineReportFromDatanode(pipeline.getNodes().get(0),
-            reportBuilder.build()));
+  private void firePipelineEvent(SCMPipelineManager pipelineManager,
+      Pipeline pipeline) throws Exception {
+    pipelineManager.openPipeline(pipeline.getId());
+    queue.fireEvent(SCMEvents.OPEN_PIPELINE,
+        pipelineManager.getPipeline(pipeline.getId()));
   }
 
 
@@ -488,10 +481,6 @@ public class TestSCMSafeModeManager {
       Pipeline pipeline = pipelineManager.createPipeline(
           HddsProtos.ReplicationType.RATIS,
           HddsProtos.ReplicationFactor.THREE);
-      PipelineReportsProto.Builder reportBuilder = PipelineReportsProto
-          .newBuilder();
-      reportBuilder.addPipelineReport(PipelineReport.newBuilder()
-          .setPipelineID(pipeline.getId().getProtobuf()));
 
       scmSafeModeManager = new SCMSafeModeManager(
           config, containers, pipelineManager, queue);
@@ -500,10 +489,9 @@ public class TestSCMSafeModeManager {
           HddsTestUtils.createNodeRegistrationContainerReport(containers));
       assertTrue(scmSafeModeManager.getInSafeMode());
 
-      // Trigger the processed pipeline report event
-      queue.fireEvent(SCMEvents.PROCESSED_PIPELINE_REPORT,
-          new PipelineReportFromDatanode(pipeline.getNodes().get(0),
-              reportBuilder.build()));
+
+
+      firePipelineEvent(pipelineManager, pipeline);
 
       GenericTestUtils.waitFor(() -> {
         return !scmSafeModeManager.getInSafeMode();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
index e4f1a37..378a1a6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
@@ -164,7 +164,9 @@ public class TestContainerStateManagerIntegration {
       }
     }
 
-    cluster.restartStorageContainerManager(true);
+    // Restart SCM will not trigger container report to satisfy the safe mode
+    // exit rule.
+    cluster.restartStorageContainerManager(false);
 
     List<ContainerInfo> result = cluster.getStorageContainerManager()
         .getContainerManager().listContainer(null, 100);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index c583559..210dbb2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -81,7 +81,7 @@ public class TestPipelineClose {
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
     conf.setTimeDuration(HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL, 1000,
         TimeUnit.MILLISECONDS);
-    pipelineDestroyTimeoutInMillis = 5000;
+    pipelineDestroyTimeoutInMillis = 10000;
     conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
         pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS);
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 00144e4..dedc56a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.commons.collections.CollectionUtils;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -45,7 +46,9 @@ public class TestRatisPipelineProvider {
   @Before
   public void init() throws Exception {
     nodeManager = new MockNodeManager(true, 10);
-    stateManager = new PipelineStateManager(new OzoneConfiguration());
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+    stateManager = new PipelineStateManager(conf);
     provider = new MockRatisPipelineProvider(nodeManager,
         stateManager, new OzoneConfiguration());
   }
@@ -57,7 +60,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
-            Pipeline.PipelineState.OPEN);
+            Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
     Pipeline pipeline1 = provider.create(factor);
     stateManager.addPipeline(pipeline1);
@@ -68,7 +71,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline1.getFactor(), factor);
     Assert.assertEquals(pipeline1.getPipelineState(),
-            Pipeline.PipelineState.OPEN);
+            Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
   }
 
@@ -80,7 +83,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
-        Pipeline.PipelineState.OPEN);
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
 
     factor = HddsProtos.ReplicationFactor.ONE;
@@ -94,7 +97,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline1.getFactor(), factor);
     Assert.assertEquals(pipeline1.getPipelineState(),
-        Pipeline.PipelineState.OPEN);
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
   }
 
@@ -183,7 +186,7 @@ public class TestRatisPipelineProvider {
     Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
     Assert.assertEquals(pipeline.getFactor(), factor);
     Assert.assertEquals(pipeline.getPipelineState(),
-        Pipeline.PipelineState.OPEN);
+        Pipeline.PipelineState.ALLOCATED);
     Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
     List<DatanodeDetails> pipelineNodes = pipeline.getNodes();
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 2a486b1..2e1fe9c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -28,10 +28,10 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
+    .CreatePipelineStatus;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -42,7 +42,6 @@ import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -61,6 +60,8 @@ public class TestSCMPipelineManager {
     testDir = GenericTestUtils
         .getTestDir(TestSCMPipelineManager.class.getSimpleName());
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    conf.set(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK,
+        "false");
     boolean folderExisted = testDir.exists() || testDir.mkdirs();
     if (!folderExisted) {
       throw new IOException("Unable to create test directory path");
@@ -157,30 +158,24 @@ public class TestSCMPipelineManager {
     pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
         mockRatisProvider);
 
-    SCMSafeModeManager scmSafeModeManager =
-        new SCMSafeModeManager(new OzoneConfiguration(),
-            new ArrayList<>(), pipelineManager, eventQueue);
-
     // create a pipeline in allocated state with no dns yet reported
     Pipeline pipeline = pipelineManager
         .createPipeline(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE);
+
     Assert
         .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
     Assert
-        .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
+        .assertFalse(pipelineManager.getPipeline(pipeline.getId()).isOpen());
 
-    // get pipeline report from each dn in the pipeline
-    PipelineReportHandler pipelineReportHandler =
-        new PipelineReportHandler(scmSafeModeManager, pipelineManager, conf);
+    // get pipeline create status from each dn in the pipeline
     for (DatanodeDetails dn: pipeline.getNodes()) {
-      PipelineReportFromDatanode pipelineReportFromDatanode =
-          TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId());
       // pipeline is not healthy until all dns report
       Assert.assertFalse(
           pipelineManager.getPipeline(pipeline.getId()).isHealthy());
-      pipelineReportHandler
-          .onMessage(pipelineReportFromDatanode, new EventQueue());
+      CreatePipelineStatus response =
+          TestUtils.getPipelineCreateStatusFromDatanode(dn, pipeline.getId());
+      pipelineManager.onMessage(response, eventQueue);
     }
 
     // pipeline is healthy when all dns report
@@ -194,11 +189,10 @@ public class TestSCMPipelineManager {
     pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
 
     for (DatanodeDetails dn: pipeline.getNodes()) {
-      PipelineReportFromDatanode pipelineReportFromDatanode =
-          TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId());
-      // pipeline report for destroyed pipeline should be ignored
-      pipelineReportHandler
-          .onMessage(pipelineReportFromDatanode, new EventQueue());
+      CreatePipelineStatus response =
+          TestUtils.getPipelineCreateStatusFromDatanode(dn, pipeline.getId());
+      // pipeline create status for destroyed pipeline should be ignored
+      pipelineManager.onMessage(response, new EventQueue());
     }
 
     try {
@@ -226,9 +220,9 @@ public class TestSCMPipelineManager {
 
     MetricsRecordBuilder metrics = getMetrics(
         SCMPipelineMetrics.class.getSimpleName());
-    long numPipelineCreated = getLongCounter("NumPipelineCreated",
+    long numPipelineAllocated = getLongCounter("NumPipelineAllocated",
         metrics);
-    Assert.assertTrue(numPipelineCreated == 0);
+    Assert.assertTrue(numPipelineAllocated == 0);
 
     // 3 DNs are unhealthy.
     // Create 5 pipelines (Use up 15 Datanodes)
@@ -241,8 +235,8 @@ public class TestSCMPipelineManager {
 
     metrics = getMetrics(
         SCMPipelineMetrics.class.getSimpleName());
-    numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
-    Assert.assertTrue(numPipelineCreated == 5);
+    numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+    Assert.assertTrue(numPipelineAllocated == 5);
 
     long numPipelineCreateFailed = getLongCounter(
         "NumPipelineCreationFailed", metrics);
@@ -261,8 +255,8 @@ public class TestSCMPipelineManager {
 
     metrics = getMetrics(
         SCMPipelineMetrics.class.getSimpleName());
-    numPipelineCreated = getLongCounter("NumPipelineCreated", metrics);
-    Assert.assertTrue(numPipelineCreated == 5);
+    numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
+    Assert.assertTrue(numPipelineAllocated == 5);
 
     numPipelineCreateFailed = getLongCounter(
         "NumPipelineCreationFailed", metrics);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index ac76482..9845abb 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -65,6 +65,7 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
+import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_HEARTBEAT_INTERVAL;
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState
     .HEALTHY;
@@ -143,11 +144,16 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       throws TimeoutException, InterruptedException {
     GenericTestUtils.waitFor(() -> {
       final int healthy = scm.getNodeCount(HEALTHY);
-      final boolean isReady = healthy == hddsDatanodes.size();
+      final boolean isNodeReady = healthy == hddsDatanodes.size();
+      final boolean exitSafeMode = !scm.isInSafeMode();
+
       LOG.info("{}. Got {} of {} DN Heartbeats.",
-          isReady? "Cluster is ready" : "Waiting for cluster to be ready",
+          isNodeReady? "Nodes are ready" : "Waiting for nodes to be ready",
+          healthy, hddsDatanodes.size());
+      LOG.info(exitSafeMode? "Cluster exits safe mode" :
+              "Waiting for cluster to exit safe mode",
           healthy, hddsDatanodes.size());
-      return isReady;
+      return isNodeReady && exitSafeMode;
     }, 1000, waitForClusterToBeReadyTimeout);
   }
 
@@ -615,11 +621,15 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       if (hbInterval.isPresent()) {
         conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
             hbInterval.get(), TimeUnit.MILLISECONDS);
+        conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL,
+            hbInterval.get(), TimeUnit.MILLISECONDS);
 
       } else {
         conf.setTimeDuration(HDDS_HEARTBEAT_INTERVAL,
             DEFAULT_HB_INTERVAL_MS,
             TimeUnit.MILLISECONDS);
+        conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL,
+            DEFAULT_HB_INTERVAL_MS, TimeUnit.MILLISECONDS);
       }
 
       if (hbProcessorInterval.isPresent()) {


---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: hdfs-commits-help@hadoop.apache.org