You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2018/09/06 21:36:21 UTC

[1/2] hadoop git commit: HDDS-297. Add pipeline actions in Ozone. Contributed by Mukul Kumar Singh and Shashikant Banerjee

Repository: hadoop
Updated Branches:
  refs/heads/trunk fa2945e7a -> b3161c4dd


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
new file mode 100644
index 0000000..7e3969c
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNodeFailure.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerMapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers
+    .ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
+    .ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
+    .ReplicationType.RATIS;
+
+/**
+ * Test Node failure detection and handling in Ratis.
+ */
+public class TestNodeFailure {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneConfiguration conf;
+  private static ContainerWithPipeline ratisContainer1;
+  private static ContainerWithPipeline ratisContainer2;
+  private static ContainerMapping mapping;
+  private static long timeForFailure;
+
+  /**
+   * Create a MiniDFSCluster for testing.
+   *
+   * @throws IOException
+   */
+  @BeforeClass
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY,
+        10, TimeUnit.SECONDS);
+    conf.setTimeDuration(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT,
+        10, TimeUnit.SECONDS);
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(6)
+        .setHbInterval(1000)
+        .setHbProcessorInterval(1000)
+        .build();
+    cluster.waitForClusterToBeReady();
+    StorageContainerManager scm = cluster.getStorageContainerManager();
+    mapping = (ContainerMapping)scm.getScmContainerManager();
+    ratisContainer1 = mapping.allocateContainer(RATIS, THREE, "testOwner");
+    ratisContainer2 = mapping.allocateContainer(RATIS, THREE, "testOwner");
+    // At this stage, there should be 2 pipeline one with 1 open container each.
+    // Try closing the both the pipelines, one with a closed container and
+    // the other with an open container.
+    timeForFailure = conf.getTimeDuration(
+        OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY,
+        OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT
+            .getDuration(), TimeUnit.MILLISECONDS);
+  }
+
+  /**
+   * Shutdown MiniDFSCluster.
+   */
+  @AfterClass
+  public static void shutdown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testPipelineFail() throws InterruptedException, IOException,
+      TimeoutException {
+    Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(),
+        HddsProtos.LifeCycleState.OPEN);
+    Pipeline pipelineToFail = ratisContainer1.getPipeline();
+    DatanodeDetails dnToFail = pipelineToFail.getMachines().get(0);
+    cluster.shutdownHddsDatanode(dnToFail);
+
+    // wait for sufficient time for the callback to be triggered
+    Thread.sleep(3 * timeForFailure);
+
+    Assert.assertEquals(HddsProtos.LifeCycleState.CLOSED,
+        ratisContainer1.getPipeline().getLifeCycleState());
+    Assert.assertEquals(HddsProtos.LifeCycleState.OPEN,
+        ratisContainer2.getPipeline().getLifeCycleState());
+    Assert.assertNull(
+        mapping.getPipelineSelector().getPipeline(pipelineToFail.getId()));
+    // Now restart the datanode and make sure that a new pipeline is created.
+    cluster.restartHddsDatanode(dnToFail);
+    ContainerWithPipeline ratisContainer3 =
+        mapping.allocateContainer(RATIS, THREE, "testOwner");
+    //Assert that new container is not created from the ratis 2 pipeline
+    Assert.assertNotEquals(ratisContainer3.getPipeline().getId(),
+        ratisContainer2.getPipeline().getId());
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index eb15396..0f8f925 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -112,8 +112,7 @@ public class TestPipelineClose {
 
     pipelineSelector.finalizePipeline(ratisContainer1.getPipeline());
     Pipeline pipeline1 = pipelineSelector
-        .getPipeline(ratisContainer1.getPipeline().getId(),
-            ratisContainer1.getContainerInfo().getReplicationType());
+        .getPipeline(ratisContainer1.getPipeline().getId());
     Assert.assertNull(pipeline1);
     Assert.assertEquals(ratisContainer1.getPipeline().getLifeCycleState(),
         HddsProtos.LifeCycleState.CLOSED);
@@ -140,8 +139,7 @@ public class TestPipelineClose {
     Assert.assertEquals(ratisContainer2.getPipeline().getLifeCycleState(),
         HddsProtos.LifeCycleState.CLOSING);
     Pipeline pipeline2 = pipelineSelector
-        .getPipeline(ratisContainer2.getPipeline().getId(),
-            ratisContainer2.getContainerInfo().getReplicationType());
+        .getPipeline(ratisContainer2.getPipeline().getId());
     Assert.assertEquals(pipeline2.getLifeCycleState(),
         HddsProtos.LifeCycleState.CLOSING);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index ae6a91e..e11cf9b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.client.OzoneClient;
 import org.apache.hadoop.ozone.om.OzoneManager;
@@ -156,6 +157,13 @@ public interface MiniOzoneCluster {
       TimeoutException;
 
   /**
+   * Restart a particular HddsDatanode.
+   *
+   * @param dn HddsDatanode in the MiniOzoneCluster
+   */
+  void restartHddsDatanode(DatanodeDetails dn) throws InterruptedException,
+      TimeoutException, IOException;
+  /**
    * Shutdown a particular HddsDatanode.
    *
    * @param i index of HddsDatanode in the MiniOzoneCluster
@@ -163,6 +171,13 @@ public interface MiniOzoneCluster {
   void shutdownHddsDatanode(int i);
 
   /**
+   * Shutdown a particular HddsDatanode.
+   *
+   * @param dn HddsDatanode in the MiniOzoneCluster
+   */
+  void shutdownHddsDatanode(DatanodeDetails dn) throws IOException;
+
+  /**
    * Shutdown the MiniOzoneCluster.
    */
   void shutdown();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index e06e2f6..7b9bb0e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -157,6 +157,16 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
     return hddsDatanodes;
   }
 
+  private int getHddsDatanodeIndex(DatanodeDetails dn) throws IOException {
+    for (HddsDatanodeService service : hddsDatanodes) {
+      if (service.getDatanodeDetails().equals(dn)) {
+        return hddsDatanodes.indexOf(service);
+      }
+    }
+    throw new IOException(
+        "Not able to find datanode with datanode Id " + dn.getUuid());
+  }
+
   @Override
   public OzoneClient getClient() throws IOException {
     return OzoneClientFactory.getClient(conf);
@@ -243,11 +253,22 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
   }
 
   @Override
+  public void restartHddsDatanode(DatanodeDetails dn)
+      throws InterruptedException, TimeoutException, IOException {
+    restartHddsDatanode(getHddsDatanodeIndex(dn));
+  }
+
+  @Override
   public void shutdownHddsDatanode(int i) {
     hddsDatanodes.get(i).stop();
   }
 
   @Override
+  public void shutdownHddsDatanode(DatanodeDetails dn) throws IOException {
+    shutdownHddsDatanode(getHddsDatanodeIndex(dn));
+  }
+
+  @Override
   public void shutdown() {
     try {
       LOG.info("Shutting down the Mini Ozone Cluster");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
index 8b324b5..b53e683 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
@@ -156,7 +156,8 @@ public class TestCSMMetrics {
     conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
 
     final ContainerDispatcher dispatcher = new TestContainerDispatcher();
-    return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher);
+    return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher,
+        null);
   }
 
   static void initXceiverServerRatis(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index ebcc930..3abc8f8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -138,7 +138,8 @@ public class TestContainerServer {
     conf.set(OzoneConfigKeys.DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR, dir);
 
     final ContainerDispatcher dispatcher = new TestContainerDispatcher();
-    return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher);
+    return XceiverServerRatis
+        .newXceiverServerRatis(dn, conf, dispatcher, null);
   }
 
   static void initXceiverServerRatis(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
index 448742e..8c83fd3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerStateMachine.java
@@ -69,7 +69,7 @@ public class TestContainerStateMachine {
           new ArrayBlockingQueue<>(1024),
           new ThreadPoolExecutor.CallerRunsPolicy());
   private ContainerStateMachine stateMachine =
-      new ContainerStateMachine(new TestContainerDispatcher(), executor);
+      new ContainerStateMachine(new TestContainerDispatcher(), executor, null);
 
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: HDDS-297. Add pipeline actions in Ozone. Contributed by Mukul Kumar Singh and Shashikant Banerjee

Posted by sz...@apache.org.
HDDS-297. Add pipeline actions in Ozone.  Contributed by Mukul Kumar Singh and Shashikant Banerjee


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b3161c4d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b3161c4d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b3161c4d

Branch: refs/heads/trunk
Commit: b3161c4dd9367c68b30528a63c03756eaa32aaf9
Parents: fa2945e
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
Authored: Thu Sep 6 14:35:07 2018 -0700
Committer: Tsz Wo Nicholas Sze <sz...@apache.org>
Committed: Thu Sep 6 14:35:07 2018 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hdds/scm/XceiverClient.java   |   8 +-
 .../hadoop/hdds/scm/XceiverClientGrpc.java      |   9 +-
 .../hadoop/hdds/scm/XceiverClientRatis.java     |  36 +++-
 .../scm/client/ContainerOperationClient.java    |   2 +-
 .../org/apache/hadoop/hdds/HddsConfigKeys.java  |   5 +
 .../apache/hadoop/hdds/scm/ScmConfigKeys.java   |  11 +
 .../hadoop/hdds/scm/XceiverClientSpi.java       |  10 +-
 .../apache/hadoop/ozone/OzoneConfigKeys.java    |  12 ++
 .../main/java/org/apache/ratis/RatisHelper.java |  22 +-
 .../common/src/main/resources/ozone-default.xml |  27 +++
 .../common/statemachine/StateContext.java       |  45 ++++
 .../states/endpoint/HeartbeatEndpointTask.java  |  28 +++
 .../server/ratis/ContainerStateMachine.java     |  17 +-
 .../server/ratis/XceiverServerRatis.java        | 205 ++++++++++++++-----
 .../container/ozoneimpl/OzoneContainer.java     |   2 +-
 .../StorageContainerDatanodeProtocol.proto      |  26 +++
 .../hdds/scm/container/ContainerMapping.java    |  27 ++-
 .../scm/container/ContainerStateManager.java    |   5 +-
 .../hadoop/hdds/scm/container/Mapping.java      |  14 ++
 .../hadoop/hdds/scm/events/SCMEvents.java       |  24 ++-
 .../hadoop/hdds/scm/node/StaleNodeHandler.java  |  16 +-
 .../hdds/scm/pipelines/Node2PipelineMap.java    |  34 +--
 .../pipelines/PipelineActionEventHandler.java   |  60 ++++++
 .../scm/pipelines/PipelineCloseHandler.java     |  38 ++++
 .../hdds/scm/pipelines/PipelineManager.java     |  10 +-
 .../hdds/scm/pipelines/PipelineSelector.java    |  46 ++---
 .../scm/pipelines/ratis/RatisManagerImpl.java   |  14 +-
 .../standalone/StandaloneManagerImpl.java       |   7 +-
 .../server/SCMDatanodeHeartbeatDispatcher.java  |  23 +++
 .../scm/server/StorageContainerManager.java     |  13 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java |   6 +-
 .../hdds/scm/pipeline/TestNodeFailure.java      | 126 ++++++++++++
 .../hdds/scm/pipeline/TestPipelineClose.java    |   6 +-
 .../apache/hadoop/ozone/MiniOzoneCluster.java   |  15 ++
 .../hadoop/ozone/MiniOzoneClusterImpl.java      |  21 ++
 .../transport/server/ratis/TestCSMMetrics.java  |   3 +-
 .../container/server/TestContainerServer.java   |   3 +-
 .../server/TestContainerStateMachine.java       |   2 +-
 38 files changed, 815 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
index 5022618..5f2fe26 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClient.java
@@ -186,15 +186,17 @@ public class XceiverClient extends XceiverClientSpi {
 
   /**
    * Create a pipeline.
-   *
-   * @param ignored -  pipeline to be created.
    */
   @Override
-  public void createPipeline(Pipeline ignored)
+  public void createPipeline()
       throws IOException {
     // For stand alone pipeline, there is no notion called setup pipeline.
   }
 
+  public void destroyPipeline() {
+    // For stand alone pipeline, there is no notion called destroy pipeline.
+  }
+
   /**
    * Returns pipeline Type.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 1622ddb..3cdbc7c 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -216,15 +216,16 @@ public class XceiverClientGrpc extends XceiverClientSpi {
 
   /**
    * Create a pipeline.
-   *
-   * @param ignored -  pipeline to be created.
    */
   @Override
-  public void createPipeline(Pipeline ignored)
-      throws IOException {
+  public void createPipeline() {
     // For stand alone pipeline, there is no notion called setup pipeline.
   }
 
+  public void destroyPipeline() {
+    // For stand alone pipeline, there is no notion called destroy pipeline.
+  }
+
   /**
    * Returns pipeline Type.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 2cb319f..499f94d 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -88,13 +88,27 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   /**
    * {@inheritDoc}
    */
-  public void createPipeline(Pipeline pipeline)
+  public void createPipeline()
       throws IOException {
     RaftGroupId groupId = pipeline.getId().getRaftGroupID();
     RaftGroup group = RatisHelper.newRaftGroup(groupId, pipeline.getMachines());
     LOG.debug("initializing pipeline:{} with nodes:{}",
         pipeline.getId(), group.getPeers());
-    reinitialize(pipeline.getMachines(), group);
+    reinitialize(pipeline.getMachines(), RatisHelper.emptyRaftGroup(), group);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  public void destroyPipeline()
+      throws IOException {
+    RaftGroupId groupId = pipeline.getId().getRaftGroupID();
+    RaftGroup currentGroup =
+        RatisHelper.newRaftGroup(groupId, pipeline.getMachines());
+    LOG.debug("destroying pipeline:{} with nodes:{}",
+        pipeline.getId(), currentGroup.getPeers());
+    reinitialize(pipeline.getMachines(), currentGroup,
+        RatisHelper.emptyRaftGroup());
   }
 
   /**
@@ -107,8 +121,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     return HddsProtos.ReplicationType.RATIS;
   }
 
-  private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup group)
-      throws IOException {
+  private void reinitialize(List<DatanodeDetails> datanodes, RaftGroup oldGroup,
+      RaftGroup newGroup) throws IOException {
     if (datanodes.isEmpty()) {
       return;
     }
@@ -116,7 +130,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     IOException exception = null;
     for (DatanodeDetails d : datanodes) {
       try {
-        reinitialize(d, group);
+        reinitialize(d, oldGroup, newGroup);
       } catch (IOException ioe) {
         if (exception == null) {
           exception = new IOException(
@@ -135,14 +149,18 @@ public final class XceiverClientRatis extends XceiverClientSpi {
    * Adds a new peers to the Ratis Ring.
    *
    * @param datanode - new datanode
-   * @param group    - Raft group
+   * @param oldGroup    - previous Raft group
+   * @param newGroup    - new Raft group
    * @throws IOException - on Failure.
    */
-  private void reinitialize(DatanodeDetails datanode, RaftGroup group)
+  private void reinitialize(DatanodeDetails datanode, RaftGroup oldGroup,
+      RaftGroup newGroup)
       throws IOException {
     final RaftPeer p = RatisHelper.toRaftPeer(datanode);
-    try (RaftClient client = RatisHelper.newRaftClient(rpcType, p)) {
-      client.reinitialize(group, p.getId());
+    try (RaftClient client = oldGroup == RatisHelper.emptyRaftGroup() ?
+        RatisHelper.newRaftClient(rpcType, p) :
+        RatisHelper.newRaftClient(rpcType, p, oldGroup)) {
+      client.reinitialize(newGroup, p.getId());
     } catch (IOException ioe) {
       LOG.error("Failed to reinitialize RaftPeer:{} datanode: {}  ",
           p, datanode, ioe);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
index 8c8cb95..fed589c 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/client/ContainerOperationClient.java
@@ -180,7 +180,7 @@ public class ContainerOperationClient implements ScmClient {
     //    ObjectStageChangeRequestProto.Op.create,
     //    ObjectStageChangeRequestProto.Stage.begin);
 
-    client.createPipeline(pipeline);
+    client.createPipeline();
 
     //storageContainerLocationClient.notifyObjectStageChange(
     //    ObjectStageChangeRequestProto.Type.pipeline,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 3bda29f..4dc7e0a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -56,6 +56,11 @@ public final class HddsConfigKeys {
   public static final int HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT =
       20;
 
+  public static final String HDDS_PIPELINE_ACTION_MAX_LIMIT =
+      "hdds.pipeline.action.max.limit";
+  public static final int HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT =
+      20;
+
   // Configuration to allow volume choosing policy.
   public static final String HDDS_DATANODE_VOLUME_CHOOSING_POLICY =
       "hdds.datanode.volume.choosing.policy";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 62d9ef5..22ba714 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdds.scm;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.util.TimeDuration;
 
 import java.util.concurrent.TimeUnit;
@@ -57,6 +58,10 @@ public final class ScmConfigKeys {
       = "dfs.container.ratis.num.write.chunk.threads";
   public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT
       = 60;
+  public static final String DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY
+      = "dfs.container.ratis.replication.level";
+  public static final ReplicationLevel
+      DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT = ReplicationLevel.MAJORITY;
   public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY =
       "dfs.container.ratis.segment.size";
   public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT =
@@ -76,6 +81,12 @@ public final class ScmConfigKeys {
       DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT =
       TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
 
+  public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
+      "dfs.ratis.server.failure.duration";
+  public static final TimeDuration
+      DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT =
+      TimeDuration.valueOf(120, TimeUnit.SECONDS);
+
   // TODO : this is copied from OzoneConsts, may need to move to a better place
   public static final String OZONE_SCM_CHUNK_SIZE_KEY = "ozone.scm.chunk.size";
   // 16 MB by default

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index b3b0da2..e8ef5c5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -111,10 +111,14 @@ public abstract class XceiverClientSpi implements Closeable {
 
   /**
    * Create a pipeline.
-   *
-   * @param pipeline -  pipeline to be created.
    */
-  public abstract void createPipeline(Pipeline pipeline) throws IOException;
+  public abstract void createPipeline() throws IOException;
+
+  /**
+   * Destroy a pipeline.
+   * @throws IOException
+   */
+  public abstract void destroyPipeline() throws IOException;
 
   /**
    * Returns pipeline Type.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 0f2b108..f07d599 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 
+import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.util.TimeDuration;
 
 /**
@@ -214,6 +215,11 @@ public final class OzoneConfigKeys {
       = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY;
   public static final int DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT
       = ScmConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT;
+  public static final String DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY;
+  public static final ReplicationLevel
+      DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT
+      = ScmConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT;
   public static final String DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY
       = ScmConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY;
   public static final int DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT
@@ -237,6 +243,12 @@ public final class OzoneConfigKeys {
       DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT =
       ScmConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT;
 
+  public static final String DFS_RATIS_SERVER_FAILURE_DURATION_KEY =
+      ScmConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY;
+  public static final TimeDuration
+      DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT =
+      ScmConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT;
+
   public static final String OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL =
       "ozone.web.authentication.kerberos.principal";
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
index 9c25e20..48fdd64 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
@@ -30,6 +30,7 @@ import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.proto.RaftProtos;
 import org.apache.ratis.util.SizeInBytes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,6 +40,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 /**
@@ -48,8 +50,19 @@ public interface RatisHelper {
   Logger LOG = LoggerFactory.getLogger(RatisHelper.class);
 
   static String toRaftPeerIdString(DatanodeDetails id) {
-    return id.getUuidString() + "_" +
-        id.getPort(DatanodeDetails.Port.Name.RATIS).getValue();
+    return id.getUuidString();
+  }
+
+  static UUID toDatanodeId(String peerIdString) {
+    return UUID.fromString(peerIdString);
+  }
+
+  static UUID toDatanodeId(RaftPeerId peerId) {
+    return toDatanodeId(peerId.toString());
+  }
+
+  static UUID toDatanodeId(RaftProtos.RaftPeerProto peerId) {
+    return toDatanodeId(RaftPeerId.valueOf(peerId.getId()));
   }
 
   static String toRaftPeerAddressString(DatanodeDetails id) {
@@ -117,6 +130,11 @@ public interface RatisHelper {
         newRaftGroup(new ArrayList<>(Arrays.asList(leader))));
   }
 
+  static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
+      RaftGroup group) {
+    return newRaftClient(rpcType, leader.getId(), group);
+  }
+
   static RaftClient newRaftClient(
       RpcType rpcType, RaftPeerId leader, RaftGroup group) {
     LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, group);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2112ae3..778d641 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -127,6 +127,15 @@
     </description>
   </property>
   <property>
+    <name>dfs.container.ratis.replication.level</name>
+    <value>MAJORITY</value>
+    <tag>OZONE, RATIS</tag>
+    <description>Replication level to be used by datanode for submitting a
+      container command to ratis. Available replication levels are ALL and
+      MAJORTIY, MAJORITY is used as the default replication level.
+    </description>
+  </property>
+  <property>
     <name>dfs.container.ratis.segment.size</name>
     <value>1073741824</value>
     <tag>OZONE, RATIS, PERFORMANCE</tag>
@@ -155,6 +164,15 @@
     <description>The timeout duration for ratis server request.</description>
   </property>
   <property>
+    <name>dfs.ratis.server.failure.duration</name>
+    <value>120s</value>
+    <tag>OZONE, RATIS, MANAGEMENT</tag>
+    <description>The timeout duration for ratis server failure detection,
+      once the threshold has reached, the ratis state machine will be informed
+      about the failure in the ratis ring
+    </description>
+  </property>
+  <property>
     <name>hdds.node.report.interval</name>
     <value>60000ms</value>
     <tag>OZONE, CONTAINER, MANAGEMENT</tag>
@@ -1105,6 +1123,15 @@
   </property>
 
   <property>
+    <name>hdds.pipeline.action.max.limit</name>
+    <value>20</value>
+    <tag>DATANODE</tag>
+    <description>
+      Maximum number of Pipeline Actions sent by the datanode to SCM in a
+      single heartbeat.
+    </description>
+  </property>
+  <property>
     <name>hdds.scm.watcher.timeout</name>
     <value>10m</value>
     <tag>OZONE, SCM, MANAGEMENT</tag>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index a342294..c2d5421 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -21,6 +21,8 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineAction;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerAction;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
@@ -66,6 +68,7 @@ public class StateContext {
   private final Configuration conf;
   private final Queue<GeneratedMessage> reports;
   private final Queue<ContainerAction> containerActions;
+  private final Queue<PipelineAction> pipelineActions;
   private DatanodeStateMachine.DatanodeStates state;
 
   /**
@@ -91,6 +94,7 @@ public class StateContext {
     cmdStatusMap = new ConcurrentHashMap<>();
     reports = new LinkedList<>();
     containerActions = new LinkedList<>();
+    pipelineActions = new LinkedList<>();
     lock = new ReentrantLock();
     stateExecutionCount = new AtomicLong(0);
   }
@@ -257,6 +261,47 @@ public class StateContext {
   }
 
   /**
+   * Add PipelineAction to PipelineAction queue if it's not present.
+   *
+   * @param pipelineAction PipelineAction to be added
+   */
+  public void addPipelineActionIfAbsent(PipelineAction pipelineAction) {
+    synchronized (pipelineActions) {
+      /**
+       * If pipelineAction queue already contains entry for the pipeline id
+       * with same action, we should just return.
+       * Note: We should not use pipelineActions.contains(pipelineAction) here
+       * as, pipelineAction has a msg string. So even if two msgs differ though
+       * action remains same on the given pipeline, it will end up adding it
+       * multiple times here.
+       */
+      for (PipelineAction pipelineActionIter : pipelineActions) {
+        if (pipelineActionIter.getAction() == pipelineAction.getAction()
+            && pipelineActionIter.hasClosePipeline() && pipelineAction
+            .hasClosePipeline()
+            && pipelineActionIter.getClosePipeline().getPipelineID()
+            == pipelineAction.getClosePipeline().getPipelineID()) {
+          return;
+        }
+      }
+      pipelineActions.add(pipelineAction);
+    }
+  }
+
+  /**
+   * Returns pending PipelineActions from the PipelineAction queue with a
+   * max limit on list size, or empty list if the queue is empty.
+   *
+   * @return List<ContainerAction>
+   */
+  public List<PipelineAction> getPendingPipelineAction(int maxLimit) {
+    synchronized (pipelineActions) {
+      return pipelineActions.parallelStream().limit(maxLimit)
+          .collect(Collectors.toList());
+    }
+  }
+
+  /**
    * Returns the next task to get executed by the datanode state machine.
    * @return A callable that will be executed by the
    * {@link DatanodeStateMachine}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 020fb71..5769e6d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -25,6 +25,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineAction;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerAction;
@@ -57,6 +61,10 @@ import static org.apache.hadoop.hdds.HddsConfigKeys
     .HDDS_CONTAINER_ACTION_MAX_LIMIT;
 import static org.apache.hadoop.hdds.HddsConfigKeys
     .HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_PIPELINE_ACTION_MAX_LIMIT;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT;
 
 /**
  * Heartbeat class for SCMs.
@@ -70,6 +78,7 @@ public class HeartbeatEndpointTask
   private DatanodeDetailsProto datanodeDetailsProto;
   private StateContext context;
   private int maxContainerActionsPerHB;
+  private int maxPipelineActionsPerHB;
 
   /**
    * Constructs a SCM heart beat.
@@ -83,6 +92,8 @@ public class HeartbeatEndpointTask
     this.context = context;
     this.maxContainerActionsPerHB = conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT,
         HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT);
+    this.maxPipelineActionsPerHB = conf.getInt(HDDS_PIPELINE_ACTION_MAX_LIMIT,
+        HDDS_PIPELINE_ACTION_MAX_LIMIT_DEFAULT);
   }
 
   /**
@@ -121,6 +132,7 @@ public class HeartbeatEndpointTask
               .setDatanodeDetails(datanodeDetailsProto);
       addReports(requestBuilder);
       addContainerActions(requestBuilder);
+      addPipelineActions(requestBuilder);
       SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
           .sendHeartbeat(requestBuilder.build());
       processResponse(reponse, datanodeDetailsProto);
@@ -169,6 +181,22 @@ public class HeartbeatEndpointTask
     }
   }
 
+  /**
+   * Adds all the pending PipelineActions to the heartbeat.
+   *
+   * @param requestBuilder builder to which the report has to be added.
+   */
+  private void addPipelineActions(
+      SCMHeartbeatRequestProto.Builder requestBuilder) {
+    List<PipelineAction> actions = context.getPendingPipelineAction(
+        maxPipelineActionsPerHB);
+    if (!actions.isEmpty()) {
+      PipelineActionsProto pap = PipelineActionsProto.newBuilder()
+          .addAllPipelineActions(actions)
+          .build();
+      requestBuilder.setPipelineActions(pap);
+    }
+  }
 
   /**
    * Returns a builder class for HeartbeatEndpointTask task.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 68d6d5b..1636f24 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.ozone.container.common.transport.server.ratis;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.shaded.com.google.protobuf
@@ -42,6 +43,7 @@ import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.server.storage.RaftStorage;
 import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto;
 import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SMLogEntryProto;
 import org.apache.ratis.statemachine.StateMachineStorage;
@@ -115,6 +117,7 @@ public class ContainerStateMachine extends BaseStateMachine {
       = new SimpleStateMachineStorage();
   private final ContainerDispatcher dispatcher;
   private ThreadPoolExecutor chunkExecutor;
+  private final XceiverServerRatis ratisServer;
   private final ConcurrentHashMap<Long, CompletableFuture<Message>>
       writeChunkFutureMap;
   private final ConcurrentHashMap<Long, StateMachineHelper> stateMachineMap;
@@ -124,9 +127,10 @@ public class ContainerStateMachine extends BaseStateMachine {
   private final CSMMetrics metrics;
 
   public ContainerStateMachine(ContainerDispatcher dispatcher,
-      ThreadPoolExecutor chunkExecutor) {
+      ThreadPoolExecutor chunkExecutor, XceiverServerRatis ratisServer) {
     this.dispatcher = dispatcher;
     this.chunkExecutor = chunkExecutor;
+    this.ratisServer = ratisServer;
     this.writeChunkFutureMap = new ConcurrentHashMap<>();
     this.stateMachineMap = new ConcurrentHashMap<>();
     metrics = CSMMetrics.create();
@@ -401,6 +405,17 @@ public class ContainerStateMachine extends BaseStateMachine {
   }
 
   @Override
+  public void notifySlowness(RaftGroup group, RoleInfoProto roleInfoProto) {
+    ratisServer.handleNodeSlowness(group, roleInfoProto);
+  }
+
+  @Override
+  public void notifyExtendedNoLeader(RaftGroup group,
+      RoleInfoProto roleInfoProto) {
+    ratisServer.handleNoLeader(group, roleInfoProto);
+  }
+
+  @Override
   public void close() throws IOException {
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 8256722..f775396 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -26,9 +26,14 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineAction;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.common.transport.server
     .XceiverServerSpi;
 import org.apache.ratis.RaftConfigKeys;
@@ -43,10 +48,15 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.NotLeaderException;
 import org.apache.ratis.protocol.StateMachineException;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.shaded.proto.RaftProtos;
+import org.apache.ratis.shaded.proto.RaftProtos.RoleInfoProto;
 import org.apache.ratis.shaded.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
@@ -59,6 +69,7 @@ import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.net.SocketAddress;
 import java.util.Objects;
+import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -81,24 +92,72 @@ public final class XceiverServerRatis implements XceiverServerSpi {
   private final RaftServer server;
   private ThreadPoolExecutor chunkExecutor;
   private ClientId clientId = ClientId.randomId();
+  private final StateContext context;
+  private final ReplicationLevel replicationLevel;
+  private long nodeFailureTimeoutMs;
 
   private XceiverServerRatis(DatanodeDetails dd, int port, String storageDir,
-      ContainerDispatcher dispatcher, Configuration conf) throws IOException {
+      ContainerDispatcher dispatcher, Configuration conf, StateContext context)
+      throws IOException {
+    Objects.requireNonNull(dd, "id == null");
+    this.port = port;
+    RaftProperties serverProperties = newRaftProperties(conf, storageDir);
+    final int numWriteChunkThreads = conf.getInt(
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
+        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
+    chunkExecutor =
+        new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
+            100, TimeUnit.SECONDS,
+            new ArrayBlockingQueue<>(1024),
+            new ThreadPoolExecutor.CallerRunsPolicy());
+    this.context = context;
+    this.replicationLevel =
+        conf.getEnum(OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_KEY,
+            OzoneConfigKeys.DFS_CONTAINER_RATIS_REPLICATION_LEVEL_DEFAULT);
+    ContainerStateMachine stateMachine =
+        new ContainerStateMachine(dispatcher, chunkExecutor, this);
+    this.server = RaftServer.newBuilder()
+        .setServerId(RatisHelper.toRaftPeerId(dd))
+        .setGroup(RatisHelper.emptyRaftGroup())
+        .setProperties(serverProperties)
+        .setStateMachine(stateMachine)
+        .build();
+  }
+
 
+  private RaftProperties newRaftProperties(Configuration conf,
+      String storageDir) {
+    final RaftProperties properties = new RaftProperties();
+
+    // Set rpc type
     final String rpcType = conf.get(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
     final RpcType rpc = SupportedRpcType.valueOfIgnoreCase(rpcType);
+    RaftConfigKeys.Rpc.setType(properties, rpc);
+
+    // set raft segment size
     final int raftSegmentSize = conf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_SIZE_DEFAULT);
+    RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
+        SizeInBytes.valueOf(raftSegmentSize));
+
+    // set raft segment pre-allocated size
     final int raftSegmentPreallocatedSize = conf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT);
+    RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
+        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+    RaftServerConfigKeys.Log.setPreallocatedSize(properties,
+        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+
+    // Set max write buffer size, which is the scm chunk size
     final int maxChunkSize = OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE;
-    final int numWriteChunkThreads = conf.getInt(
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_KEY,
-        OzoneConfigKeys.DFS_CONTAINER_RATIS_NUM_WRITE_CHUNK_THREADS_DEFAULT);
+    RaftServerConfigKeys.Log.setWriteBufferSize(properties,
+        SizeInBytes.valueOf(maxChunkSize));
+
+    // Set the client requestTimeout
     TimeUnit timeUnit =
         OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
             .getUnit();
@@ -108,6 +167,10 @@ public final class XceiverServerRatis implements XceiverServerSpi {
             .getDuration(), timeUnit);
     final TimeDuration clientRequestTimeout =
         TimeDuration.valueOf(duration, timeUnit);
+    RaftClientConfigKeys.Rpc
+        .setRequestTimeout(properties, clientRequestTimeout);
+
+    // Set the server Request timeout
     timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_REQUEST_TIMEOUT_DURATION_DEFAULT
         .getUnit();
     duration = conf.getTimeDuration(
@@ -116,61 +179,44 @@ public final class XceiverServerRatis implements XceiverServerSpi {
             .getDuration(), timeUnit);
     final TimeDuration serverRequestTimeout =
         TimeDuration.valueOf(duration, timeUnit);
-
-    Objects.requireNonNull(dd, "id == null");
-    this.port = port;
-    RaftProperties serverProperties =
-        newRaftProperties(rpc, port, storageDir, maxChunkSize, raftSegmentSize,
-            raftSegmentPreallocatedSize);
-    setRequestTimeout(serverProperties, clientRequestTimeout,
-        serverRequestTimeout);
-
-    chunkExecutor =
-        new ThreadPoolExecutor(numWriteChunkThreads, numWriteChunkThreads,
-            100, TimeUnit.SECONDS,
-            new ArrayBlockingQueue<>(1024),
-            new ThreadPoolExecutor.CallerRunsPolicy());
-    ContainerStateMachine stateMachine =
-        new ContainerStateMachine(dispatcher, chunkExecutor);
-    this.server = RaftServer.newBuilder()
-        .setServerId(RatisHelper.toRaftPeerId(dd))
-        .setGroup(RatisHelper.emptyRaftGroup())
-        .setProperties(serverProperties)
-        .setStateMachine(stateMachine)
-        .build();
-  }
-
-  private static void setRequestTimeout(RaftProperties serverProperties,
-      TimeDuration clientRequestTimeout, TimeDuration serverRequestTimeout) {
-    RaftClientConfigKeys.Rpc
-        .setRequestTimeout(serverProperties, clientRequestTimeout);
     RaftServerConfigKeys.Rpc
-        .setRequestTimeout(serverProperties, serverRequestTimeout);
-  }
+        .setRequestTimeout(properties, serverRequestTimeout);
 
-  private static RaftProperties newRaftProperties(
-      RpcType rpc, int port, String storageDir, int scmChunkSize,
-      int raftSegmentSize, int raftSegmentPreallocatedSize) {
-    final RaftProperties properties = new RaftProperties();
+    // Enable batch append on raft server
     RaftServerConfigKeys.Log.Appender.setBatchEnabled(properties, true);
-    RaftServerConfigKeys.Log.Appender.setBufferCapacity(properties,
-        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
-    RaftServerConfigKeys.Log.setWriteBufferSize(properties,
-        SizeInBytes.valueOf(scmChunkSize));
-    RaftServerConfigKeys.Log.setPreallocatedSize(properties,
-        SizeInBytes.valueOf(raftSegmentPreallocatedSize));
-    RaftServerConfigKeys.Log.setSegmentSizeMax(properties,
-        SizeInBytes.valueOf(raftSegmentSize));
-    RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
-    RaftConfigKeys.Rpc.setType(properties, rpc);
 
+    // Set the maximum cache segments
     RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
-    GrpcConfigKeys.setMessageSizeMax(properties,
-        SizeInBytes.valueOf(scmChunkSize + raftSegmentPreallocatedSize));
+
+    // Set the ratis leader election timeout
     RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
         TimeDuration.valueOf(800, TimeUnit.MILLISECONDS));
     RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
         TimeDuration.valueOf(1000, TimeUnit.MILLISECONDS));
+
+    // set the node failure timeout
+    timeUnit = OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT
+        .getUnit();
+    duration = conf.getTimeDuration(
+        OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_KEY,
+        OzoneConfigKeys.DFS_RATIS_SERVER_FAILURE_DURATION_DEFAULT
+            .getDuration(), timeUnit);
+    final TimeDuration nodeFailureTimeout =
+        TimeDuration.valueOf(duration, timeUnit);
+    RaftServerConfigKeys.setLeaderElectionTimeout(properties,
+        nodeFailureTimeout);
+    RaftServerConfigKeys.Rpc.setSlownessTimeout(properties,
+        nodeFailureTimeout);
+    nodeFailureTimeoutMs = nodeFailureTimeout.toLong(TimeUnit.MILLISECONDS);
+
+    // Set the ratis storage directory
+    RaftServerConfigKeys.setStorageDir(properties, new File(storageDir));
+
+    // For grpc set the maximum message size
+    GrpcConfigKeys.setMessageSizeMax(properties,
+        SizeInBytes.valueOf(maxChunkSize + raftSegmentPreallocatedSize));
+
+    // Set the ratis port number
     if (rpc == SupportedRpcType.GRPC) {
       GrpcConfigKeys.Server.setPort(properties, port);
     } else if (rpc == SupportedRpcType.NETTY) {
@@ -181,7 +227,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
 
   public static XceiverServerRatis newXceiverServerRatis(
       DatanodeDetails datanodeDetails, Configuration ozoneConf,
-      ContainerDispatcher dispatcher) throws IOException {
+      ContainerDispatcher dispatcher, StateContext context) throws IOException {
     final String ratisDir = File.separator + "ratis";
     int localPort = ozoneConf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
@@ -226,7 +272,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     datanodeDetails.setPort(
         DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort));
     return new XceiverServerRatis(datanodeDetails, localPort, storageDir,
-        dispatcher, ozoneConf);
+        dispatcher, ozoneConf, context);
   }
 
   @Override
@@ -296,7 +342,7 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     // the request here are applied on all the raft servers.
     RaftClientRequest raftClientRequest =
         createRaftClientRequest(request, pipelineID,
-            RaftClientRequest.writeRequestType(ReplicationLevel.ALL));
+            RaftClientRequest.writeRequestType(replicationLevel));
     CompletableFuture<RaftClientReply> reply =
         server.submitClientRequestAsync(raftClientRequest);
     reply.thenAccept(this::processReply);
@@ -309,4 +355,57 @@ public final class XceiverServerRatis implements XceiverServerSpi {
         PipelineID.getFromProtobuf(pipelineID).getRaftGroupID(),
         nextCallId(), 0, Message.valueOf(request.toByteString()), type);
   }
+
+  private void handlePipelineFailure(RaftGroupId groupId,
+      RoleInfoProto roleInfoProto) {
+    String msg;
+    UUID datanode = RatisHelper.toDatanodeId(roleInfoProto.getSelf());
+    RaftPeerId id = RaftPeerId.valueOf(roleInfoProto.getSelf().getId());
+    switch (roleInfoProto.getRole()) {
+    case CANDIDATE:
+      msg = datanode + " is in candidate state for " +
+          roleInfoProto.getCandidateInfo().getLastLeaderElapsedTimeMs() + "ms";
+      break;
+    case LEADER:
+      StringBuilder sb = new StringBuilder();
+      sb.append(datanode).append(" has not seen follower/s");
+      for (RaftProtos.ServerRpcProto follower : roleInfoProto.getLeaderInfo()
+          .getFollowerInfoList()) {
+        if (follower.getLastRpcElapsedTimeMs() > nodeFailureTimeoutMs) {
+          sb.append(" ").append(RatisHelper.toDatanodeId(follower.getId()))
+              .append(" for ").append(follower.getLastRpcElapsedTimeMs())
+              .append("ms");
+        }
+      }
+      msg = sb.toString();
+      break;
+    default:
+      LOG.error("unknown state:" + roleInfoProto.getRole());
+      throw new IllegalStateException("node" + id + " is in illegal role "
+          + roleInfoProto.getRole());
+    }
+
+    PipelineID pipelineID = PipelineID.valueOf(groupId);
+    ClosePipelineInfo.Builder closePipelineInfo =
+        ClosePipelineInfo.newBuilder()
+            .setPipelineID(pipelineID.getProtobuf())
+            .setReason(ClosePipelineInfo.Reason.PIPELINE_FAILED)
+            .setDetailedReason(msg);
+
+    PipelineAction action = PipelineAction.newBuilder()
+        .setClosePipeline(closePipelineInfo)
+        .setAction(PipelineAction.Action.CLOSE)
+        .build();
+    context.addPipelineActionIfAbsent(action);
+  }
+
+  void handleNodeSlowness(
+      RaftGroup group, RoleInfoProto roleInfoProto) {
+    handlePipelineFailure(group.getGroupId(), roleInfoProto);
+  }
+
+  void handleNoLeader(
+      RaftGroup group, RoleInfoProto roleInfoProto) {
+    handlePipelineFailure(group.getGroupId(), roleInfoProto);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index b1bf381..72a5804 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -84,7 +84,7 @@ public class OzoneContainer {
         new XceiverServerGrpc(datanodeDetails, this.config, this
             .hddsDispatcher, createReplicationService()),
         XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this
-            .config, hddsDispatcher)
+            .config, hddsDispatcher, context)
     };
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 1a3496d..0a69343 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -81,6 +81,7 @@ message SCMHeartbeatRequestProto {
   optional ContainerReportsProto containerReport = 3;
   optional CommandStatusReportsProto commandStatusReport = 4;
   optional ContainerActionsProto containerActions = 5;
+  optional PipelineActionsProto pipelineActions = 6;
 }
 
 /*
@@ -162,6 +163,31 @@ message ContainerAction {
   optional Reason reason = 3;
 }
 
+message PipelineActionsProto {
+  repeated PipelineAction pipelineActions = 1;
+}
+
+message ClosePipelineInfo {
+  enum Reason {
+    PIPELINE_FAILED = 1;
+  }
+  required PipelineID pipelineID = 1;
+  optional Reason reason = 3;
+  optional string detailedReason = 4;
+}
+
+message PipelineAction {
+  enum Action {
+    CLOSE = 1;
+  }
+
+  /**
+   * Action will be used to identify the correct pipeline action.
+   */
+  required Action action = 1;
+  optional ClosePipelineInfo closePipeline = 2;
+}
+
 /**
 A container report contains the following information.
 */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 8f5d8d6..3554339 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -200,8 +200,7 @@ public class ContainerMapping implements Mapping {
       Pipeline pipeline;
       if (contInfo.isContainerOpen()) {
         // If pipeline with given pipeline Id already exist return it
-        pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID(),
-            contInfo.getReplicationType());
+        pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID());
         if (pipeline == null) {
           pipeline = pipelineSelector
               .getReplicationPipeline(contInfo.getReplicationType(),
@@ -389,8 +388,7 @@ public class ContainerMapping implements Mapping {
           .updateContainerState(containerInfo, event);
       if (!updatedContainer.isContainerOpen()) {
         Pipeline pipeline = pipelineSelector
-            .getPipeline(containerInfo.getPipelineID(),
-                containerInfo.getReplicationType());
+            .getPipeline(containerInfo.getPipelineID());
         pipelineSelector.closePipelineIfNoOpenContainers(pipeline);
       }
       containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
@@ -470,8 +468,7 @@ public class ContainerMapping implements Mapping {
       return null;
     }
     Pipeline pipeline = pipelineSelector
-        .getPipeline(containerInfo.getPipelineID(),
-            containerInfo.getReplicationType());
+        .getPipeline(containerInfo.getPipelineID());
     if (pipeline == null) {
       pipeline = pipelineSelector
           .getReplicationPipeline(containerInfo.getReplicationType(),
@@ -480,6 +477,24 @@ public class ContainerMapping implements Mapping {
     return new ContainerWithPipeline(containerInfo, pipeline);
   }
 
+  public void handlePipelineClose(PipelineID pipelineID) {
+    try {
+      Pipeline pipeline = pipelineSelector.getPipeline(pipelineID);
+      if (pipeline != null) {
+        pipelineSelector.finalizePipeline(pipeline);
+      } else {
+        LOG.debug("pipeline:{} not found", pipelineID);
+      }
+    } catch (Exception e) {
+      LOG.info("failed to close pipeline:{}", pipelineID, e);
+    }
+  }
+
+  public Set<PipelineID> getPipelineOnDatanode(
+      DatanodeDetails datanodeDetails) {
+    return pipelineSelector.getPipelineId(datanodeDetails.getUuid());
+  }
+
   /**
    * Process container report from Datanode.
    * <p>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 7afed42..421d34e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -486,10 +486,9 @@ public class ContainerStateManager implements Closeable {
    * @throws IOException
    */
   public ContainerWithPipeline getContainer(PipelineSelector selector,
-      ContainerID containerID) throws IOException {
+      ContainerID containerID) {
     ContainerInfo info = containers.getContainerInfo(containerID.getId());
-    Pipeline pipeline = selector.getPipeline(info.getPipelineID(),
-        info.getReplicationType());
+    Pipeline pipeline = selector.getPipeline(info.getPipelineID());
     return new ContainerWithPipeline(info, pipeline);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
index f4b5bb2..1b0c57c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/Mapping.java
@@ -25,11 +25,13 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 /**
  * Mapping class contains the mapping from a name to a pipeline mapping. This is
@@ -135,4 +137,16 @@ public interface Mapping extends Closeable {
   ContainerWithPipeline getMatchingContainerWithPipeline(long size,
       String owner, ReplicationType type, ReplicationFactor factor,
       LifeCycleState state) throws IOException;
+
+  /**
+   * Handle a pipeline close event.
+   * @param pipelineID pipeline id
+   */
+  void handlePipelineClose(PipelineID pipelineID);
+
+  /**
+   * Get set of pipeline for a specific datanode.
+   * @param datanodeDetails datanode for which pipelines needs to be fetched.
+   */
+  Set<PipelineID> getPipelineOnDatanode(DatanodeDetails datanodeDetails);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 9a4f887..03df8eb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -29,6 +29,9 @@ import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
     .ReplicationStatus;
 import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler.CloseContainerRetryableReq;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .PipelineActionsFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .ContainerActionsFromDatanode;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
@@ -72,6 +75,23 @@ public final class SCMEvents {
   public static final TypedEvent<ContainerActionsFromDatanode>
       CONTAINER_ACTIONS = new TypedEvent<>(ContainerActionsFromDatanode.class,
       "Container_Actions");
+
+  /**
+   * PipelineActions are sent by Datanode. This event is received by
+   * SCMDatanodeHeartbeatDispatcher and PIPELINE_ACTIONS event is generated.
+   */
+  public static final TypedEvent<PipelineActionsFromDatanode>
+      PIPELINE_ACTIONS = new TypedEvent<>(PipelineActionsFromDatanode.class,
+      "Pipeline_Actions");
+
+  /**
+   * Pipeline close event are triggered to close pipeline because of failure,
+   * stale node, decommissioning etc.
+   */
+  public static final TypedEvent<PipelineID>
+      PIPELINE_CLOSE = new TypedEvent<>(PipelineID.class,
+      "Pipeline_Close");
+
   /**
    * A Command status report will be sent by datanodes. This repoort is received
    * by SCMDatanodeHeartbeatDispatcher and CommandReport event is generated.
@@ -155,7 +175,7 @@ public final class SCMEvents {
    */
   public static final Event<DeleteBlockCommandStatus>
       DELETE_BLOCK_STATUS =
-      new TypedEvent(DeleteBlockCommandStatus.class,
+      new TypedEvent<>(DeleteBlockCommandStatus.class,
           "DeleteBlockCommandStatus");
 
   /**
@@ -164,7 +184,7 @@ public final class SCMEvents {
    * deleteTransactionID on SCM.
    */
   public static final Event<PendingDeleteStatusList> PENDING_DELETE_STATUS =
-      new TypedEvent(PendingDeleteStatusList.class, "PendingDeleteStatus");
+      new TypedEvent<>(PendingDeleteStatusList.class, "PendingDeleteStatus");
 
   /**
    * This is the command for ReplicationManager to handle under/over

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
index b37dd93..0bd9339 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
@@ -19,24 +19,36 @@
 package org.apache.hadoop.hdds.scm.node;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 
+import java.util.Set;
+
 /**
  * Handles Stale node event.
  */
 public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
 
   private final Node2ContainerMap node2ContainerMap;
+  private final Mapping containerManager;
 
-  public StaleNodeHandler(Node2ContainerMap node2ContainerMap) {
+  public StaleNodeHandler(Node2ContainerMap node2ContainerMap,
+      Mapping containerManager) {
     this.node2ContainerMap = node2ContainerMap;
+    this.containerManager = containerManager;
   }
 
   @Override
   public void onMessage(DatanodeDetails datanodeDetails,
                         EventPublisher publisher) {
-    //TODO: logic to handle stale node.
+    Set<PipelineID> pipelineIDs =
+        containerManager.getPipelineOnDatanode(datanodeDetails);
+    for (PipelineID id : pipelineIDs) {
+      publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, id);
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
index 4a7fa81..363ce71 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/Node2PipelineMap.java
@@ -21,7 +21,7 @@ package org.apache.hadoop.hdds.scm.pipelines;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 
 import java.util.Collections;
 import java.util.HashSet;
@@ -30,8 +30,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUPLICATE_DATANODE;
-
 /**
  * This data structure maintains the list of pipelines which the given datanode is a part of. This
  * information will be added whenever a new pipeline allocation happens.
@@ -39,7 +37,7 @@ import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.DUP
  * <p>TODO: this information needs to be regenerated from pipeline reports on SCM restart
  */
 public class Node2PipelineMap {
-  private final Map<UUID, Set<Pipeline>> dn2PipelineMap;
+  private final Map<UUID, Set<PipelineID>> dn2PipelineMap;
 
   /** Constructs a Node2PipelineMap Object. */
   public Node2PipelineMap() {
@@ -58,20 +56,6 @@ public class Node2PipelineMap {
   }
 
   /**
-   * Insert a new datanode into Node2Pipeline Map.
-   *
-   * @param datanodeID -- Datanode UUID
-   * @param pipelines - set of pipelines.
-   */
-  private void insertNewDatanode(UUID datanodeID, Set<Pipeline> pipelines) throws SCMException {
-    Preconditions.checkNotNull(pipelines);
-    Preconditions.checkNotNull(datanodeID);
-    if (dn2PipelineMap.putIfAbsent(datanodeID, pipelines) != null) {
-      throw new SCMException("Node already exists in the map", DUPLICATE_DATANODE);
-    }
-  }
-
-  /**
    * Removes datanode Entry from the map.
    *
    * @param datanodeID - Datanode ID.
@@ -87,9 +71,10 @@ public class Node2PipelineMap {
    * @param datanode - UUID
    * @return Set of pipelines or Null.
    */
-  public Set<Pipeline> getPipelines(UUID datanode) {
+  public Set<PipelineID> getPipelines(UUID datanode) {
     Preconditions.checkNotNull(datanode);
-    return dn2PipelineMap.computeIfPresent(datanode, (k, v) -> Collections.unmodifiableSet(v));
+    final Set<PipelineID> s = dn2PipelineMap.get(datanode);
+    return s != null? Collections.unmodifiableSet(s): Collections.emptySet();
   }
 
   /**
@@ -100,9 +85,8 @@ public class Node2PipelineMap {
   public synchronized void addPipeline(Pipeline pipeline) {
     for (DatanodeDetails details : pipeline.getDatanodes().values()) {
       UUID dnId = details.getUuid();
-      dn2PipelineMap
-          .computeIfAbsent(dnId, k -> Collections.synchronizedSet(new HashSet<>()))
-          .add(pipeline);
+      dn2PipelineMap.computeIfAbsent(dnId, k -> new HashSet<>())
+          .add(pipeline.getId());
     }
   }
 
@@ -112,13 +96,13 @@ public class Node2PipelineMap {
       dn2PipelineMap.computeIfPresent(
           dnId,
           (k, v) -> {
-            v.remove(pipeline);
+            v.remove(pipeline.getId());
             return v;
           });
     }
   }
 
-  public Map<UUID, Set<Pipeline>> getDn2PipelineMap() {
+  public Map<UUID, Set<PipelineID>> getDn2PipelineMap() {
     return Collections.unmodifiableMap(dn2PipelineMap);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java
new file mode 100644
index 0000000..54c2400
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineActionEventHandler.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipelines;
+
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineAction;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
+    .PipelineActionsFromDatanode;
+
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Handles pipeline actions from datanode.
+ */
+public class PipelineActionEventHandler implements
+    EventHandler<PipelineActionsFromDatanode> {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      PipelineActionEventHandler.class);
+
+  public PipelineActionEventHandler() {
+
+  }
+
+  @Override
+  public void onMessage(PipelineActionsFromDatanode report,
+      EventPublisher publisher) {
+    for (PipelineAction action : report.getReport().getPipelineActionsList()) {
+      switch (action.getAction()) {
+      case CLOSE:
+        PipelineID pipelineID = PipelineID.
+            getFromProtobuf(action.getClosePipeline().getPipelineID());
+        publisher.fireEvent(SCMEvents.PIPELINE_CLOSE, pipelineID);
+        break;
+      default:
+        LOG.error("unknown pipeline action:{}" + action.getAction());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
new file mode 100644
index 0000000..733dec5
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineCloseHandler.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipelines;
+
+import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+
+/**
+ * Handles pipeline close event.
+ */
+public class PipelineCloseHandler implements EventHandler<PipelineID> {
+  private final Mapping mapping;
+  public PipelineCloseHandler(Mapping mapping) {
+    this.mapping = mapping;
+  }
+
+  @Override
+  public void onMessage(PipelineID pipelineID, EventPublisher publisher) {
+    mapping.handlePipelineClose(pipelineID);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index 5b1a7f7..102df8a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -18,7 +18,6 @@ package org.apache.hadoop.hdds.scm.pipelines;
 
 import java.util.LinkedList;
 import java.util.Map;
-import java.util.WeakHashMap;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@@ -43,11 +42,12 @@ public abstract class PipelineManager {
   private final AtomicInteger pipelineIndex;
   private final Node2PipelineMap node2PipelineMap;
 
-  public PipelineManager(Node2PipelineMap map) {
+  public PipelineManager(Node2PipelineMap map,
+      Map<PipelineID, Pipeline> pipelineMap) {
     activePipelines = new LinkedList<>();
     pipelineIndex = new AtomicInteger(0);
-    pipelineMap = new WeakHashMap<>();
-    node2PipelineMap = map;
+    this.pipelineMap = pipelineMap;
+    this.node2PipelineMap = map;
   }
 
   /**
@@ -187,7 +187,7 @@ public abstract class PipelineManager {
    *
    * @param pipeline
    */
-  public void closePipeline(Pipeline pipeline) {
+  public void closePipeline(Pipeline pipeline) throws IOException {
     pipelineMap.remove(pipeline.getId());
     node2PipelineMap.removePipeline(pipeline);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index b02beb3..63afbaa 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -55,6 +55,8 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.NavigableSet;
 import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -77,6 +79,7 @@ public class PipelineSelector {
   private final StandaloneManagerImpl standaloneManager;
   private final long containerSize;
   private final Node2PipelineMap node2PipelineMap;
+  private final Map<PipelineID, Pipeline> pipelineMap;
   private final LeaseManager<Pipeline> pipelineLeaseManager;
   private final StateMachine<LifeCycleState,
       HddsProtos.LifeCycleEvent> stateMachine;
@@ -99,12 +102,13 @@ public class PipelineSelector {
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT,
         StorageUnit.BYTES);
     node2PipelineMap = new Node2PipelineMap();
+    pipelineMap = new ConcurrentHashMap<>();
     this.standaloneManager =
         new StandaloneManagerImpl(this.nodeManager, placementPolicy,
-            containerSize, node2PipelineMap);
+            containerSize, node2PipelineMap, pipelineMap);
     this.ratisManager =
         new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
-            conf, node2PipelineMap);
+            conf, node2PipelineMap, pipelineMap);
     // Initialize the container state machine.
     Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
     long pipelineCreationLeaseTimeout = conf.getTimeDuration(
@@ -303,19 +307,10 @@ public class PipelineSelector {
   }
 
   /**
-   * This function to return pipeline for given pipeline name and replication
-   * type.
+   * This function to return pipeline for given pipeline id.
    */
-  public Pipeline getPipeline(PipelineID pipelineID,
-      ReplicationType replicationType) throws IOException {
-    if (pipelineID == null) {
-      return null;
-    }
-    PipelineManager manager = getPipelineManager(replicationType);
-    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    LOG.debug("Getting replication pipeline forReplicationType {} :" +
-        " pipelineName:{}", replicationType, pipelineID);
-    return manager.getPipeline(pipelineID);
+  public Pipeline getPipeline(PipelineID pipelineID) {
+    return pipelineMap.get(pipelineID);
   }
 
   /**
@@ -324,9 +319,18 @@ public class PipelineSelector {
   public void finalizePipeline(Pipeline pipeline) throws IOException {
     PipelineManager manager = getPipelineManager(pipeline.getType());
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    LOG.debug("Finalizing pipeline. pipelineID: {}", pipeline.getId());
+    if (pipeline.getLifeCycleState() == LifeCycleState.CLOSING ||
+        pipeline.getLifeCycleState() == LifeCycleState.CLOSED) {
+      LOG.debug("pipeline:{} already in closing state, skipping",
+          pipeline.getId());
+      // already in closing/closed state
+      return;
+    }
+
     // Remove the pipeline from active allocation
     manager.finalizePipeline(pipeline);
+
+    LOG.info("Finalizing pipeline. pipelineID: {}", pipeline.getId());
     updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.FINALIZE);
     closePipelineIfNoOpenContainers(pipeline);
   }
@@ -350,7 +354,7 @@ public class PipelineSelector {
   /**
    * Close a given pipeline.
    */
-  private void closePipeline(Pipeline pipeline) {
+  private void closePipeline(Pipeline pipeline) throws IOException {
     PipelineManager manager = getPipelineManager(pipeline.getType());
     Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
     LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId());
@@ -400,14 +404,8 @@ public class PipelineSelector {
     return node2PipelineMap;
   }
 
-  public void removePipeline(UUID dnId) {
-    Set<Pipeline> pipelineSet =
-        node2PipelineMap.getPipelines(dnId);
-    for (Pipeline pipeline : pipelineSet) {
-      getPipelineManager(pipeline.getType())
-          .closePipeline(pipeline);
-    }
-    node2PipelineMap.removeDatanode(dnId);
+  public Set<PipelineID> getPipelineId(UUID dnId) {
+    return node2PipelineMap.getPipelines(dnId);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index 8b14483..150802e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -39,6 +39,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.Map;
 
 /**
  * Implementation of {@link PipelineManager}.
@@ -59,8 +60,8 @@ public class RatisManagerImpl extends PipelineManager {
    */
   public RatisManagerImpl(NodeManager nodeManager,
       ContainerPlacementPolicy placementPolicy, long size, Configuration conf,
-      Node2PipelineMap map) {
-    super(map);
+      Node2PipelineMap map, Map<PipelineID, Pipeline> pipelineMap) {
+    super(map, pipelineMap);
     this.conf = conf;
     this.nodeManager = nodeManager;
     ratisMembers = new HashSet<>();
@@ -101,20 +102,23 @@ public class RatisManagerImpl extends PipelineManager {
     //TODO:move the initialization from SCM to client
     try (XceiverClientRatis client =
         XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
-      client.createPipeline(pipeline);
+      client.createPipeline();
     }
   }
 
   /**
    * Close the pipeline.
    */
-  public void closePipeline(Pipeline pipeline) {
+  public void closePipeline(Pipeline pipeline) throws IOException {
     super.closePipeline(pipeline);
     for (DatanodeDetails node : pipeline.getMachines()) {
       // A node should always be the in ratis members list.
       Preconditions.checkArgument(ratisMembers.remove(node));
     }
-    //TODO: should the raft ring also be destroyed as well?
+    try (XceiverClientRatis client =
+        XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
+      client.destroyPipeline();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index f1b23f5..2573b9c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -37,6 +37,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.Map;
 
 /**
  * Standalone Manager Impl to prove that pluggable interface
@@ -58,8 +59,8 @@ public class StandaloneManagerImpl extends PipelineManager {
    */
   public StandaloneManagerImpl(NodeManager nodeManager,
       ContainerPlacementPolicy placementPolicy, long containerSize,
-      Node2PipelineMap map) {
-    super(map);
+      Node2PipelineMap map, Map<PipelineID, Pipeline> pipelineMap) {
+    super(map, pipelineMap);
     this.nodeManager = nodeManager;
     this.placementPolicy = placementPolicy;
     this.containerSize =  containerSize;
@@ -103,7 +104,7 @@ public class StandaloneManagerImpl extends PipelineManager {
   /**
    * Close the pipeline.
    */
-  public void closePipeline(Pipeline pipeline) {
+  public void closePipeline(Pipeline pipeline) throws IOException {
     super.closePipeline(pipeline);
     for (DatanodeDetails node : pipeline.getMachines()) {
       // A node should always be the in standalone members list.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index c259141..a651f62 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdds.scm.server;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.PipelineActionsProto;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
@@ -43,6 +45,8 @@ import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_ACTIONS;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CONTAINER_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.NODE_REPORT;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.CMD_STATUS_REPORT;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.PIPELINE_ACTIONS;
+
 /**
  * This class is responsible for dispatching heartbeat from datanode to
  * appropriate EventHandler at SCM.
@@ -99,6 +103,13 @@ public final class SCMDatanodeHeartbeatDispatcher {
               heartbeat.getContainerActions()));
     }
 
+    if (heartbeat.hasPipelineActions()) {
+      LOG.debug("Dispatching Pipeline Actions.");
+      eventPublisher.fireEvent(PIPELINE_ACTIONS,
+          new PipelineActionsFromDatanode(datanodeDetails,
+              heartbeat.getPipelineActions()));
+    }
+
     if (heartbeat.hasCommandStatusReport()) {
       eventPublisher.fireEvent(CMD_STATUS_REPORT,
           new CommandStatusReportFromDatanode(datanodeDetails,
@@ -168,6 +179,18 @@ public final class SCMDatanodeHeartbeatDispatcher {
   }
 
   /**
+   * Pipeline action event payload with origin.
+   */
+  public static class PipelineActionsFromDatanode
+      extends ReportFromDatanode<PipelineActionsProto> {
+
+    public PipelineActionsFromDatanode(DatanodeDetails datanodeDetails,
+        PipelineActionsProto actions) {
+      super(datanodeDetails, actions);
+    }
+  }
+
+  /**
    * Container report event payload with origin.
    */
   public static class CommandStatusReportFromDatanode

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 061ff78..b84f399 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -62,6 +62,8 @@ import org.apache.hadoop.hdds.scm.node.NodeReportHandler;
 import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
 import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
 import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineCloseHandler;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineActionEventHandler;
 import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
 import org.apache.hadoop.hdds.server.events.EventQueue;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -218,7 +220,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new CommandStatusReportHandler();
 
     NewNodeHandler newNodeHandler = new NewNodeHandler(node2ContainerMap);
-    StaleNodeHandler staleNodeHandler = new StaleNodeHandler(node2ContainerMap);
+    StaleNodeHandler staleNodeHandler =
+        new StaleNodeHandler(node2ContainerMap, scmContainerManager);
     DeadNodeHandler deadNodeHandler = new DeadNodeHandler(node2ContainerMap,
         getScmContainerManager().getStateManager());
     ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
@@ -229,6 +232,11 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         new ContainerReportHandler(scmContainerManager, node2ContainerMap,
             replicationStatus);
 
+    PipelineActionEventHandler pipelineActionEventHandler =
+        new PipelineActionEventHandler();
+
+    PipelineCloseHandler pipelineCloseHandler =
+        new PipelineCloseHandler(scmContainerManager);
 
     eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
     eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
@@ -242,6 +250,9 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
     eventQueue.addHandler(SCMEvents.START_REPLICATION, replicationStatus);
     eventQueue
         .addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
+    eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS,
+        pipelineActionEventHandler);
+    eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler);
 
     long watcherTimeout =
         conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b3161c4d/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index c0cd293..b8cb9970 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerMapping;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .ContainerWithPipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
@@ -97,10 +97,10 @@ public class TestNode2PipelineMap {
     Assert.assertEquals(3, dns.size());
 
     // get pipeline details by dnid
-    Set<Pipeline> pipelines = mapping.getPipelineSelector()
+    Set<PipelineID> pipelines = mapping.getPipelineSelector()
         .getNode2PipelineMap().getPipelines(dns.get(0).getUuid());
     Assert.assertEquals(1, pipelines.size());
-    pipelines.forEach(p -> Assert.assertEquals(p.getId(),
+    pipelines.forEach(p -> Assert.assertEquals(p,
         ratisContainer.getPipeline().getId()));
 
 


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org