You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/07/20 21:38:12 UTC

[1/3] hadoop git commit: Revert "HDDS-239. Add PipelineStateManager to track pipeline state transition. Contributed by Mukul Kumar Singh."

Repository: hadoop
Updated Branches:
  refs/heads/trunk c7ae55675 -> 347c95501


Revert "HDDS-239. Add PipelineStateManager to track pipeline state transition. Contributed by Mukul Kumar Singh."

This reverts commit 6837121a43231f854b0b22ad20330012439313ce.(Mixed with HDDS-260)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d2acf8d5
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d2acf8d5
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d2acf8d5

Branch: refs/heads/trunk
Commit: d2acf8d560950f06ffbf5c217fbfab76cd70d5da
Parents: c7ae556
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Fri Jul 20 14:20:18 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Fri Jul 20 14:20:18 2018 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hdds/scm/ScmConfigKeys.java   |   5 -
 .../scm/container/common/helpers/Pipeline.java  |   7 -
 .../common/src/main/resources/ozone-default.xml |  12 -
 .../common/statemachine/StateContext.java       |  52 +---
 .../states/endpoint/HeartbeatEndpointTask.java  |  24 +-
 .../StorageContainerDatanodeProtocol.proto      |   4 +-
 .../common/report/TestReportPublisher.java      |  41 +++
 .../endpoint/TestHeartbeatEndpointTask.java     | 302 -------------------
 .../common/states/endpoint/package-info.java    |  18 --
 .../hdds/scm/container/ContainerMapping.java    |   4 -
 .../hdds/scm/exceptions/SCMException.java       |   1 -
 .../hdds/scm/pipelines/PipelineManager.java     |  64 ++--
 .../hdds/scm/pipelines/PipelineSelector.java    | 212 ++-----------
 .../scm/pipelines/ratis/RatisManagerImpl.java   |  33 +-
 .../standalone/StandaloneManagerImpl.java       |  21 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java |  14 -
 16 files changed, 146 insertions(+), 668 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 6e940ad..71184cf 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -236,11 +236,6 @@ public final class ScmConfigKeys {
   public static final String
       OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
 
-  public static final String OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT =
-      "ozone.scm.pipeline.creation.lease.timeout";
-
-  public static final String
-      OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
 
   public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
       "ozone.scm.block.deletion.max.retry";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index 534c9fd..c5794f4 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -214,13 +214,6 @@ public class Pipeline {
   }
 
   /**
-   * Update the State of the pipeline.
-   */
-  public void setLifeCycleState(HddsProtos.LifeCycleState nextState) {
-     lifeCycleState = nextState;
-  }
-
-  /**
    * Gets the pipeline Name.
    *
    * @return - Name of the pipeline

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 69a382a..5a1d26a 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1085,17 +1085,5 @@
       executed since last report. Unit could be defined with
       postfix (ns,ms,s,m,h,d)</description>
   </property>
-  <property>
-    <name>ozone.scm.pipeline.creation.lease.timeout</name>
-    <value>60s</value>
-    <tag>OZONE, SCM, PIPELINE</tag>
-    <description>
-      Pipeline creation timeout in milliseconds to be used by SCM. When
-      BEGIN_CREATE event happens the pipeline is moved from ALLOCATED to
-      CREATING state, SCM will now wait for the configured amount of time
-      to get COMPLETE_CREATE event if it doesn't receive it will move the
-      pipeline to DELETING.
-    </description>
-  </property>
 
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 4951f2a..faaff69 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -20,18 +20,14 @@ import com.google.protobuf.GeneratedMessage;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerAction;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
 import org.apache.hadoop.ozone.container.common.states.DatanodeState;
 import org.apache.hadoop.ozone.container.common.states.datanode
     .InitDatanodeState;
 import org.apache.hadoop.ozone.container.common.states.datanode
     .RunningDatanodeState;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
-import org.apache.hadoop.ozone.protocol.commands.CommandStatus
-    .CommandStatusBuilder;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -47,7 +43,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
 
@@ -64,7 +59,6 @@ public class StateContext {
   private final AtomicLong stateExecutionCount;
   private final Configuration conf;
   private final Queue<GeneratedMessage> reports;
-  private final Queue<ContainerAction> containerActions;
   private DatanodeStateMachine.DatanodeStates state;
 
   /**
@@ -82,7 +76,6 @@ public class StateContext {
     commandQueue = new LinkedList<>();
     cmdStatusMap = new ConcurrentHashMap<>();
     reports = new LinkedList<>();
-    containerActions = new LinkedList<>();
     lock = new ReentrantLock();
     stateExecutionCount = new AtomicLong(0);
   }
@@ -205,47 +198,6 @@ public class StateContext {
     return results;
   }
 
-
-  /**
-   * Adds the ContainerAction to ContainerAction queue.
-   *
-   * @param containerAction ContainerAction to be added
-   */
-  public void addContainerAction(ContainerAction containerAction) {
-    synchronized (containerActions) {
-      containerActions.add(containerAction);
-    }
-  }
-
-  /**
-   * Returns all the pending ContainerActions from the ContainerAction queue,
-   * or empty list if the queue is empty.
-   *
-   * @return List<ContainerAction>
-   */
-  public List<ContainerAction> getAllPendingContainerActions() {
-    return getPendingContainerAction(Integer.MAX_VALUE);
-  }
-
-  /**
-   * Returns pending ContainerActions from the ContainerAction queue with a
-   * max limit on list size, or empty list if the queue is empty.
-   *
-   * @return List<ContainerAction>
-   */
-  public List<ContainerAction> getPendingContainerAction(int maxLimit) {
-    List<ContainerAction> results = new ArrayList<>();
-    synchronized (containerActions) {
-      containerActions.parallelStream().limit(maxLimit).collect(Collectors.toList());
-      ContainerAction action = containerActions.poll();
-      while(results.size() < maxLimit && action != null) {
-        results.add(action);
-        action = containerActions.poll();
-      }
-    }
-    return results;
-  }
-
   /**
    * Returns the next task to get executed by the datanode state machine.
    * @return A callable that will be executed by the

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 214e1cd..260a245 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -25,10 +25,6 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerAction;
-import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
@@ -50,7 +46,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.ZonedDateTime;
-import java.util.List;
 import java.util.concurrent.Callable;
 
 /**
@@ -112,7 +107,7 @@ public class HeartbeatEndpointTask
           SCMHeartbeatRequestProto.newBuilder()
               .setDatanodeDetails(datanodeDetailsProto);
       addReports(requestBuilder);
-      addContainerActions(requestBuilder);
+
       SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
           .sendHeartbeat(requestBuilder.build());
       processResponse(reponse, datanodeDetailsProto);
@@ -145,23 +140,6 @@ public class HeartbeatEndpointTask
   }
 
   /**
-   * Adds all the pending ContainerActions to the heartbeat.
-   *
-   * @param requestBuilder builder to which the report has to be added.
-   */
-  private void addContainerActions(
-      SCMHeartbeatRequestProto.Builder requestBuilder) {
-    List<ContainerAction> actions = context.getAllPendingContainerActions();
-    if (!actions.isEmpty()) {
-      ContainerActionsProto cap = ContainerActionsProto.newBuilder()
-          .addAllContainerActions(actions)
-          .build();
-      requestBuilder.setContainerActions(cap);
-    }
-  }
-
-
-  /**
    * Returns a builder class for HeartbeatEndpointTask task.
    * @return   Builder.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index d89567b..4238389 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -79,8 +79,8 @@ message SCMHeartbeatRequestProto {
   required DatanodeDetailsProto datanodeDetails = 1;
   optional NodeReportProto nodeReport = 2;
   optional ContainerReportsProto containerReport = 3;
-  optional CommandStatusReportsProto commandStatusReport = 4;
-  optional ContainerActionsProto containerActions = 5;
+  optional ContainerActionsProto containerActions = 4;
+  optional CommandStatusReportsProto commandStatusReport = 5;
 }
 
 /*

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index 811599f..a0db2e8 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.container.common.report;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Descriptors;
 import com.google.protobuf.GeneratedMessage;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -27,8 +28,14 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -171,6 +178,22 @@ public class TestReportPublisher {
     executorService.shutdown();
   }
 
+  @Test
+  public void testAddingReportToHeartbeat() {
+    GeneratedMessage nodeReport = NodeReportProto.getDefaultInstance();
+    GeneratedMessage containerReport = ContainerReportsProto
+        .getDefaultInstance();
+    SCMHeartbeatRequestProto.Builder heartbeatBuilder =
+        SCMHeartbeatRequestProto.newBuilder();
+    heartbeatBuilder.setDatanodeDetails(
+        getDatanodeDetails().getProtoBufMessage());
+    addReport(heartbeatBuilder, nodeReport);
+    addReport(heartbeatBuilder, containerReport);
+    SCMHeartbeatRequestProto heartbeat = heartbeatBuilder.build();
+    Assert.assertTrue(heartbeat.hasNodeReport());
+    Assert.assertTrue(heartbeat.hasContainerReport());
+  }
+
   /**
    * Get a datanode details.
    *
@@ -199,4 +222,22 @@ public class TestReportPublisher {
     return builder.build();
   }
 
+  /**
+   * Adds the report to heartbeat.
+   *
+   * @param requestBuilder builder to which the report has to be added.
+   * @param report         the report to be added.
+   */
+  private static void addReport(SCMHeartbeatRequestProto.Builder
+      requestBuilder, GeneratedMessage report) {
+    String reportName = report.getDescriptorForType().getFullName();
+    for (Descriptors.FieldDescriptor descriptor :
+        SCMHeartbeatRequestProto.getDescriptor().getFields()) {
+      String heartbeatFieldName = descriptor.getMessageType().getFullName();
+      if (heartbeatFieldName.equals(reportName)) {
+        requestBuilder.setField(descriptor, report);
+      }
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
deleted file mode 100644
index 87bd811..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++ /dev/null
@@ -1,302 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.states.endpoint;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerInfo;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerAction;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .DatanodeStateMachine;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .DatanodeStateMachine.DatanodeStates;
-import org.apache.hadoop.ozone.container.common.statemachine
-    .EndpointStateMachine;
-import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.ozone.protocolPB
-    .StorageContainerDatanodeProtocolClientSideTranslatorPB;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Mockito;
-
-import java.util.UUID;
-
-import static org.mockito.ArgumentMatchers.any;
-
-/**
- * This class tests the functionality of HeartbeatEndpointTask.
- */
-public class TestHeartbeatEndpointTask {
-
-
-  @Test
-  public void testheartbeatWithoutReports() throws Exception {
-    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
-        Mockito.mock(
-            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
-    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
-        .forClass(SCMHeartbeatRequestProto.class);
-    Mockito.when(scm.sendHeartbeat(argument.capture()))
-        .thenAnswer(invocation ->
-            SCMHeartbeatResponseProto.newBuilder()
-                .setDatanodeUUID(
-                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
-                        .getDatanodeDetails().getUuid())
-                .build());
-
-    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(scm);
-    endpointTask.call();
-    SCMHeartbeatRequestProto heartbeat = argument.getValue();
-    Assert.assertTrue(heartbeat.hasDatanodeDetails());
-    Assert.assertFalse(heartbeat.hasNodeReport());
-    Assert.assertFalse(heartbeat.hasContainerReport());
-    Assert.assertFalse(heartbeat.hasCommandStatusReport());
-    Assert.assertFalse(heartbeat.hasContainerActions());
-  }
-
-  @Test
-  public void testheartbeatWithNodeReports() throws Exception {
-    Configuration conf = new OzoneConfiguration();
-    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
-        Mockito.mock(DatanodeStateMachine.class));
-
-    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
-        Mockito.mock(
-            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
-    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
-        .forClass(SCMHeartbeatRequestProto.class);
-    Mockito.when(scm.sendHeartbeat(argument.capture()))
-        .thenAnswer(invocation ->
-            SCMHeartbeatResponseProto.newBuilder()
-                .setDatanodeUUID(
-                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
-                        .getDatanodeDetails().getUuid())
-                .build());
-
-    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
-        conf, context, scm);
-    context.addReport(NodeReportProto.getDefaultInstance());
-    endpointTask.call();
-    SCMHeartbeatRequestProto heartbeat = argument.getValue();
-    Assert.assertTrue(heartbeat.hasDatanodeDetails());
-    Assert.assertTrue(heartbeat.hasNodeReport());
-    Assert.assertFalse(heartbeat.hasContainerReport());
-    Assert.assertFalse(heartbeat.hasCommandStatusReport());
-    Assert.assertFalse(heartbeat.hasContainerActions());
-  }
-
-  @Test
-  public void testheartbeatWithContainerReports() throws Exception {
-    Configuration conf = new OzoneConfiguration();
-    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
-        Mockito.mock(DatanodeStateMachine.class));
-
-    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
-        Mockito.mock(
-            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
-    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
-        .forClass(SCMHeartbeatRequestProto.class);
-    Mockito.when(scm.sendHeartbeat(argument.capture()))
-        .thenAnswer(invocation ->
-            SCMHeartbeatResponseProto.newBuilder()
-                .setDatanodeUUID(
-                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
-                        .getDatanodeDetails().getUuid())
-                .build());
-
-    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
-        conf, context, scm);
-    context.addReport(ContainerReportsProto.getDefaultInstance());
-    endpointTask.call();
-    SCMHeartbeatRequestProto heartbeat = argument.getValue();
-    Assert.assertTrue(heartbeat.hasDatanodeDetails());
-    Assert.assertFalse(heartbeat.hasNodeReport());
-    Assert.assertTrue(heartbeat.hasContainerReport());
-    Assert.assertFalse(heartbeat.hasCommandStatusReport());
-    Assert.assertFalse(heartbeat.hasContainerActions());
-  }
-
-  @Test
-  public void testheartbeatWithCommandStatusReports() throws Exception {
-    Configuration conf = new OzoneConfiguration();
-    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
-        Mockito.mock(DatanodeStateMachine.class));
-
-    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
-        Mockito.mock(
-            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
-    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
-        .forClass(SCMHeartbeatRequestProto.class);
-    Mockito.when(scm.sendHeartbeat(argument.capture()))
-        .thenAnswer(invocation ->
-            SCMHeartbeatResponseProto.newBuilder()
-                .setDatanodeUUID(
-                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
-                        .getDatanodeDetails().getUuid())
-                .build());
-
-    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
-        conf, context, scm);
-    context.addReport(CommandStatusReportsProto.getDefaultInstance());
-    endpointTask.call();
-    SCMHeartbeatRequestProto heartbeat = argument.getValue();
-    Assert.assertTrue(heartbeat.hasDatanodeDetails());
-    Assert.assertFalse(heartbeat.hasNodeReport());
-    Assert.assertFalse(heartbeat.hasContainerReport());
-    Assert.assertTrue(heartbeat.hasCommandStatusReport());
-    Assert.assertFalse(heartbeat.hasContainerActions());
-  }
-
-  @Test
-  public void testheartbeatWithContainerActions() throws Exception {
-    Configuration conf = new OzoneConfiguration();
-    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
-        Mockito.mock(DatanodeStateMachine.class));
-
-    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
-        Mockito.mock(
-            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
-    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
-        .forClass(SCMHeartbeatRequestProto.class);
-    Mockito.when(scm.sendHeartbeat(argument.capture()))
-        .thenAnswer(invocation ->
-            SCMHeartbeatResponseProto.newBuilder()
-                .setDatanodeUUID(
-                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
-                        .getDatanodeDetails().getUuid())
-                .build());
-
-    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
-        conf, context, scm);
-    context.addContainerAction(getContainerAction());
-    endpointTask.call();
-    SCMHeartbeatRequestProto heartbeat = argument.getValue();
-    Assert.assertTrue(heartbeat.hasDatanodeDetails());
-    Assert.assertFalse(heartbeat.hasNodeReport());
-    Assert.assertFalse(heartbeat.hasContainerReport());
-    Assert.assertFalse(heartbeat.hasCommandStatusReport());
-    Assert.assertTrue(heartbeat.hasContainerActions());
-  }
-
-  @Test
-  public void testheartbeatWithAllReports() throws Exception {
-    Configuration conf = new OzoneConfiguration();
-    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
-        Mockito.mock(DatanodeStateMachine.class));
-
-    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
-        Mockito.mock(
-            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
-    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
-        .forClass(SCMHeartbeatRequestProto.class);
-    Mockito.when(scm.sendHeartbeat(argument.capture()))
-        .thenAnswer(invocation ->
-            SCMHeartbeatResponseProto.newBuilder()
-                .setDatanodeUUID(
-                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
-                        .getDatanodeDetails().getUuid())
-                .build());
-
-    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
-        conf, context, scm);
-    context.addReport(NodeReportProto.getDefaultInstance());
-    context.addReport(ContainerReportsProto.getDefaultInstance());
-    context.addReport(CommandStatusReportsProto.getDefaultInstance());
-    context.addContainerAction(getContainerAction());
-    endpointTask.call();
-    SCMHeartbeatRequestProto heartbeat = argument.getValue();
-    Assert.assertTrue(heartbeat.hasDatanodeDetails());
-    Assert.assertTrue(heartbeat.hasNodeReport());
-    Assert.assertTrue(heartbeat.hasContainerReport());
-    Assert.assertTrue(heartbeat.hasCommandStatusReport());
-    Assert.assertTrue(heartbeat.hasContainerActions());
-  }
-
-  /**
-   * Creates HeartbeatEndpointTask for the given StorageContainerManager proxy.
-   *
-   * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
-   *
-   * @return HeartbeatEndpointTask
-   */
-  private HeartbeatEndpointTask getHeartbeatEndpointTask(
-      StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
-    Configuration conf = new OzoneConfiguration();
-    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
-        Mockito.mock(DatanodeStateMachine.class));
-    return getHeartbeatEndpointTask(conf, context, proxy);
-
-  }
-
-  /**
-   * Creates HeartbeatEndpointTask with the given conf, context and
-   * StorageContainerManager client side proxy.
-   *
-   * @param conf Configuration
-   * @param context StateContext
-   * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
-   *
-   * @return HeartbeatEndpointTask
-   */
-  private HeartbeatEndpointTask getHeartbeatEndpointTask(
-      Configuration conf,
-      StateContext context,
-      StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
-    DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder()
-        .setUuid(UUID.randomUUID().toString())
-        .setHostName("localhost")
-        .setIpAddress("127.0.0.1")
-        .build();
-    EndpointStateMachine endpointStateMachine = Mockito
-        .mock(EndpointStateMachine.class);
-    Mockito.when(endpointStateMachine.getEndPoint()).thenReturn(proxy);
-    return HeartbeatEndpointTask.newBuilder()
-        .setConfig(conf)
-        .setDatanodeDetails(datanodeDetails)
-        .setContext(context)
-        .setEndpointStateMachine(endpointStateMachine)
-        .build();
-  }
-
-  private ContainerAction getContainerAction() {
-    ContainerAction.Builder builder = ContainerAction.newBuilder();
-    ContainerInfo containerInfo = ContainerInfo.newBuilder()
-        .setContainerID(1L)
-        .build();
-    builder.setContainer(containerInfo)
-        .setAction(ContainerAction.Action.CLOSE)
-        .setReason(ContainerAction.Reason.CONTAINER_FULL);
-    return builder.build();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
deleted file mode 100644
index d120a5c..0000000
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.states.endpoint;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index f07d22b..26f4d86 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -658,10 +658,6 @@ public class ContainerMapping implements Mapping {
     if (containerStore != null) {
       containerStore.close();
     }
-
-    if (pipelineSelector != null) {
-      pipelineSelector.shutdown();
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
index 0085542..d7d70ef 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -107,7 +107,6 @@ public class SCMException extends IOException {
     FAILED_TO_LOAD_OPEN_CONTAINER,
     FAILED_TO_ALLOCATE_CONTAINER,
     FAILED_TO_CHANGE_CONTAINER_STATE,
-    FAILED_TO_CHANGE_PIPELINE_STATE,
     CONTAINER_EXISTS,
     FAILED_TO_FIND_CONTAINER,
     FAILED_TO_FIND_CONTAINER_WITH_SPACE,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index 77d8211..a041973 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -59,16 +59,41 @@ public abstract class PipelineManager {
    * @return a Pipeline.
    */
   public synchronized final Pipeline getPipeline(
-      ReplicationFactor replicationFactor, ReplicationType replicationType) {
-    Pipeline pipeline = findOpenPipeline(replicationType, replicationFactor);
+      ReplicationFactor replicationFactor, ReplicationType replicationType)
+      throws IOException {
+    /**
+     * In the Ozone world, we have a very simple policy.
+     *
+     * 1. Try to create a pipeline if there are enough free nodes.
+     *
+     * 2. This allows all nodes to part of a pipeline quickly.
+     *
+     * 3. if there are not enough free nodes, return pipeline in a
+     * round-robin fashion.
+     *
+     * TODO: Might have to come up with a better algorithm than this.
+     * Create a new placement policy that returns pipelines in round robin
+     * fashion.
+     */
+    Pipeline pipeline = allocatePipeline(replicationFactor);
     if (pipeline != null) {
-      LOG.debug("re-used pipeline:{} for container with " +
+      LOG.debug("created new pipeline:{} for container with " +
               "replicationType:{} replicationFactor:{}",
           pipeline.getPipelineName(), replicationType, replicationFactor);
+      activePipelines.add(pipeline);
+      activePipelineMap.put(pipeline.getPipelineName(), pipeline);
+      node2PipelineMap.addPipeline(pipeline);
+    } else {
+      pipeline = findOpenPipeline(replicationType, replicationFactor);
+      if (pipeline != null) {
+        LOG.debug("re-used pipeline:{} for container with " +
+                "replicationType:{} replicationFactor:{}",
+            pipeline.getPipelineName(), replicationType, replicationFactor);
+      }
     }
     if (pipeline == null) {
       LOG.error("Get pipeline call failed. We are not able to find" +
-              " operational pipeline.");
+              "free nodes or operational pipeline.");
       return null;
     } else {
       return pipeline;
@@ -84,7 +109,7 @@ public abstract class PipelineManager {
   public synchronized final Pipeline getPipeline(String pipelineName) {
     Pipeline pipeline = null;
 
-    // 1. Check if pipeline already exists
+    // 1. Check if pipeline channel already exists
     if (activePipelineMap.containsKey(pipelineName)) {
       pipeline = activePipelineMap.get(pipelineName);
       LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
@@ -107,13 +132,7 @@ public abstract class PipelineManager {
   }
 
   public abstract Pipeline allocatePipeline(
-      ReplicationFactor replicationFactor);
-
-  /**
-   * Initialize the pipeline
-   * TODO: move the initialization to Ozone Client later
-   */
-  public abstract void initializePipeline(Pipeline pipeline) throws IOException;
+      ReplicationFactor replicationFactor) throws IOException;
 
   public void removePipeline(Pipeline pipeline) {
     activePipelines.remove(pipeline);
@@ -160,23 +179,12 @@ public abstract class PipelineManager {
   }
 
   /**
-   * Creates a pipeline with a specified replication factor and type.
-   * @param replicationFactor - Replication Factor.
-   * @param replicationType - Replication Type.
+   * Creates a pipeline from a specified set of Nodes.
+   * @param pipelineID - Name of the pipeline
+   * @param datanodes - The list of datanodes that make this pipeline.
    */
-  public Pipeline createPipeline(ReplicationFactor replicationFactor,
-      ReplicationType replicationType) throws IOException {
-    Pipeline pipeline = allocatePipeline(replicationFactor);
-    if (pipeline != null) {
-      LOG.debug("created new pipeline:{} for container with "
-              + "replicationType:{} replicationFactor:{}",
-          pipeline.getPipelineName(), replicationType, replicationFactor);
-      activePipelines.add(pipeline);
-      activePipelineMap.put(pipeline.getPipelineName(), pipeline);
-      node2PipelineMap.addPipeline(pipeline);
-    }
-    return pipeline;
-  }
+  public abstract void createPipeline(String pipelineID,
+      List<DatanodeDetails> datanodes) throws IOException;
 
   /**
    * Close the  pipeline with the given clusterId.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index 08710e7..2955af5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .SCMContainerPlacementRandom;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
 import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
@@ -34,28 +33,17 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.statemachine
-    .InvalidStateTransitionException;
-import org.apache.hadoop.ozone.common.statemachine.StateMachine;
-import org.apache.hadoop.ozone.lease.Lease;
-import org.apache.hadoop.ozone.lease.LeaseException;
-import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_CHANGE_PIPELINE_STATE;
-
 /**
  * Sends the request to the right pipeline manager.
  */
@@ -69,10 +57,6 @@ public class PipelineSelector {
   private final StandaloneManagerImpl standaloneManager;
   private final long containerSize;
   private final Node2PipelineMap node2PipelineMap;
-  private final LeaseManager<Pipeline> pipelineLeaseManager;
-  private final StateMachine<LifeCycleState,
-      HddsProtos.LifeCycleEvent> stateMachine;
-
   /**
    * Constructs a pipeline Selector.
    *
@@ -93,74 +77,6 @@ public class PipelineSelector {
     this.ratisManager =
         new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
             conf, node2PipelineMap);
-    // Initialize the container state machine.
-    Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
-    long pipelineCreationLeaseTimeout = conf.getTimeDuration(
-        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
-        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
-        TimeUnit.MILLISECONDS);
-    LOG.trace("Starting Pipeline Lease Manager.");
-    pipelineLeaseManager = new LeaseManager<>(pipelineCreationLeaseTimeout);
-    pipelineLeaseManager.start();
-
-    // These are the steady states of a container.
-    finalStates.add(HddsProtos.LifeCycleState.OPEN);
-    finalStates.add(HddsProtos.LifeCycleState.CLOSED);
-
-    this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
-        finalStates);
-    initializeStateMachine();
-  }
-
-  /**
-   * Event and State Transition Mapping:
-   *
-   * State: ALLOCATED ---------------> CREATING
-   * Event:                CREATE
-   *
-   * State: CREATING  ---------------> OPEN
-   * Event:               CREATED
-   *
-   * State: OPEN      ---------------> CLOSING
-   * Event:               FINALIZE
-   *
-   * State: CLOSING   ---------------> CLOSED
-   * Event:                CLOSE
-   *
-   * State: CREATING  ---------------> CLOSED
-   * Event:               TIMEOUT
-   *
-   *
-   * Container State Flow:
-   *
-   * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
-   *            (CREATE)     | (CREATED)     (FINALIZE)   |
-   *                         |                            |
-   *                         |                            |
-   *                         |(TIMEOUT)                   |(CLOSE)
-   *                         |                            |
-   *                         +--------> [CLOSED] <--------+
-   */
-  private void initializeStateMachine() {
-    stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
-        HddsProtos.LifeCycleState.CREATING,
-        HddsProtos.LifeCycleEvent.CREATE);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
-        HddsProtos.LifeCycleState.OPEN,
-        HddsProtos.LifeCycleEvent.CREATED);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
-        HddsProtos.LifeCycleState.CLOSING,
-        HddsProtos.LifeCycleEvent.FINALIZE);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
-        HddsProtos.LifeCycleState.CLOSED,
-        HddsProtos.LifeCycleEvent.CLOSE);
-
-    stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
-        HddsProtos.LifeCycleState.CLOSED,
-        HddsProtos.LifeCycleEvent.TIMEOUT);
   }
 
   /**
@@ -172,14 +88,15 @@ public class PipelineSelector {
    * @return pipeline corresponding to nodes
    */
   public static Pipeline newPipelineFromNodes(
-      List<DatanodeDetails> nodes, ReplicationType replicationType,
-      ReplicationFactor replicationFactor, String name) {
+      List<DatanodeDetails> nodes, LifeCycleState state,
+      ReplicationType replicationType, ReplicationFactor replicationFactor,
+      String name) {
     Preconditions.checkNotNull(nodes);
     Preconditions.checkArgument(nodes.size() > 0);
     String leaderId = nodes.get(0).getUuidString();
-    // A new pipeline always starts in allocated state
-    Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED,
-        replicationType, replicationFactor, name);
+    Pipeline
+        pipeline = new Pipeline(leaderId, state, replicationType,
+        replicationFactor, name);
     for (DatanodeDetails node : nodes) {
       pipeline.addMember(node);
     }
@@ -258,35 +175,8 @@ public class PipelineSelector {
     LOG.debug("Getting replication pipeline forReplicationType {} :" +
             " ReplicationFactor {}", replicationType.toString(),
         replicationFactor.toString());
-
-    /**
-     * In the Ozone world, we have a very simple policy.
-     *
-     * 1. Try to create a pipeline if there are enough free nodes.
-     *
-     * 2. This allows all nodes to part of a pipeline quickly.
-     *
-     * 3. if there are not enough free nodes, return already allocated pipeline
-     * in a round-robin fashion.
-     *
-     * TODO: Might have to come up with a better algorithm than this.
-     * Create a new placement policy that returns pipelines in round robin
-     * fashion.
-     */
-    Pipeline pipeline =
-        manager.createPipeline(replicationFactor, replicationType);
-    if (pipeline == null) {
-      // try to return a pipeline from already allocated pipelines
-      pipeline = manager.getPipeline(replicationFactor, replicationType);
-    } else {
-      // if a new pipeline is created, initialize its state machine
-      updatePipelineState(pipeline,HddsProtos.LifeCycleEvent.CREATE);
-
-      //TODO: move the initialization of pipeline to Ozone Client
-      manager.initializePipeline(pipeline);
-      updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATED);
-    }
-    return pipeline;
+    return manager.
+        getPipeline(replicationFactor, replicationType);
   }
 
   /**
@@ -304,6 +194,19 @@ public class PipelineSelector {
         " pipelineName:{}", replicationType, pipelineName);
     return manager.getPipeline(pipelineName);
   }
+  /**
+   * Creates a pipeline from a specified set of Nodes.
+   */
+
+  public void createPipeline(ReplicationType replicationType, String
+      pipelineID, List<DatanodeDetails> datanodes) throws IOException {
+    PipelineManager manager = getPipelineManager(replicationType);
+    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
+    LOG.debug("Creating a pipeline: {} with nodes:{}", pipelineID,
+        datanodes.stream().map(DatanodeDetails::toString)
+            .collect(Collectors.joining(",")));
+    manager.createPipeline(pipelineID, datanodes);
+  }
 
   /**
    * Close the  pipeline with the given clusterId.
@@ -348,77 +251,12 @@ public class PipelineSelector {
   }
 
   public void removePipeline(UUID dnId) {
-    Set<Pipeline> pipelineSet =
+    Set<Pipeline> pipelineChannelSet =
         node2PipelineMap.getPipelines(dnId);
-    for (Pipeline pipeline : pipelineSet) {
-      getPipelineManager(pipeline.getType())
-          .removePipeline(pipeline);
+    for (Pipeline pipelineChannel : pipelineChannelSet) {
+      getPipelineManager(pipelineChannel.getType())
+          .removePipeline(pipelineChannel);
     }
     node2PipelineMap.removeDatanode(dnId);
   }
-
-  /**
-   * Update the Pipeline State to the next state.
-   *
-   * @param pipeline - Pipeline
-   * @param event - LifeCycle Event
-   * @throws SCMException  on Failure.
-   */
-  public void updatePipelineState(Pipeline pipeline,
-      HddsProtos.LifeCycleEvent event) throws IOException {
-    HddsProtos.LifeCycleState newState;
-    try {
-      newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
-    } catch (InvalidStateTransitionException ex) {
-      String error = String.format("Failed to update pipeline state %s, " +
-              "reason: invalid state transition from state: %s upon " +
-              "event: %s.",
-          pipeline.getPipelineName(), pipeline.getLifeCycleState(), event);
-      LOG.error(error);
-      throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
-    }
-
-    // This is a post condition after executing getNextState.
-    Preconditions.checkNotNull(newState);
-    Preconditions.checkNotNull(pipeline);
-    try {
-      switch (event) {
-      case CREATE:
-        // Acquire lease on pipeline
-        Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline);
-        // Register callback to be executed in case of timeout
-        pipelineLease.registerCallBack(() -> {
-          updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.TIMEOUT);
-          return null;
-        });
-        break;
-      case CREATED:
-        // Release the lease on pipeline
-        pipelineLeaseManager.release(pipeline);
-        break;
-
-      case FINALIZE:
-        //TODO: cleanup pipeline by closing all the containers on the pipeline
-        break;
-
-      case CLOSE:
-      case TIMEOUT:
-        // TODO: Release the nodes here when pipelines are destroyed
-        break;
-      default:
-        throw new SCMException("Unsupported pipeline LifeCycleEvent.",
-            FAILED_TO_CHANGE_PIPELINE_STATE);
-      }
-
-      pipeline.setLifeCycleState(newState);
-    } catch (LeaseException e) {
-      throw new IOException("Lease Exception.", e);
-    }
-  }
-
-  public void shutdown() {
-    if (pipelineLeaseManager != null) {
-      pipelineLeaseManager.shutdown();
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index c726ef6..a8f8b20 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -71,7 +72,7 @@ public class RatisManagerImpl extends PipelineManager {
    * Allocates a new ratis Pipeline from the free nodes.
    *
    * @param factor - One or Three
-   * @return Pipeline.
+   * @return PipelineChannel.
    */
   public Pipeline allocatePipeline(ReplicationFactor factor) {
     List<DatanodeDetails> newNodesList = new LinkedList<>();
@@ -88,23 +89,35 @@ public class RatisManagerImpl extends PipelineManager {
           // further allocations
           ratisMembers.addAll(newNodesList);
           LOG.info("Allocating a new ratis pipeline of size: {}", count);
-          // Start all pipeline names with "Ratis", easy to grep the logs.
+          // Start all channel names with "Ratis", easy to grep the logs.
           String pipelineName = PREFIX +
               UUID.randomUUID().toString().substring(PREFIX.length());
-          return PipelineSelector.newPipelineFromNodes(newNodesList,
-              ReplicationType.RATIS, factor, pipelineName);
+          Pipeline pipeline=
+              PipelineSelector.newPipelineFromNodes(newNodesList,
+              LifeCycleState.OPEN, ReplicationType.RATIS, factor, pipelineName);
+          try (XceiverClientRatis client =
+              XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
+            client.createPipeline(pipeline.getPipelineName(), newNodesList);
+          } catch (IOException e) {
+            return null;
+          }
+          return pipeline;
         }
       }
     }
     return null;
   }
 
-  public void initializePipeline(Pipeline pipeline) throws IOException {
-    //TODO:move the initialization from SCM to client
-    try (XceiverClientRatis client =
-        XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
-      client.createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
-    }
+  /**
+   * Creates a pipeline from a specified set of Nodes.
+   *
+   * @param pipelineID - Name of the pipeline
+   * @param datanodes - The list of datanodes that make this pipeline.
+   */
+  @Override
+  public void createPipeline(String pipelineID,
+                             List<DatanodeDetails> datanodes) {
+
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index bb4951f..cf691bf 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -85,19 +86,29 @@ public class StandaloneManagerImpl extends PipelineManager {
           // once a datanode has been added to a pipeline, exclude it from
           // further allocations
           standAloneMembers.addAll(newNodesList);
-          LOG.info("Allocating a new standalone pipeline of size: {}", count);
-          String pipelineName =
+          LOG.info("Allocating a new standalone pipeline channel of size: {}",
+              count);
+          String channelName =
               "SA-" + UUID.randomUUID().toString().substring(3);
           return PipelineSelector.newPipelineFromNodes(newNodesList,
-              ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
+              LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
+              ReplicationFactor.ONE, channelName);
         }
       }
     }
     return null;
   }
 
-  public void initializePipeline(Pipeline pipeline) {
-    // Nothing to be done for standalone pipeline
+  /**
+   * Creates a pipeline from a specified set of Nodes.
+   *
+   * @param pipelineID - Name of the pipeline
+   * @param datanodes - The list of datanodes that make this pipeline.
+   */
+  @Override
+  public void createPipeline(String pipelineID,
+                             List<DatanodeDetails> datanodes) {
+    //return newPipelineFromNodes(datanodes, pipelineID);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d2acf8d5/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index ffac6d5..bc3505f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers
     .ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.junit.AfterClass;
@@ -53,7 +51,6 @@ public class TestNode2PipelineMap {
   private static ContainerWithPipeline ratisContainer;
   private static ContainerStateMap stateMap;
   private static ContainerMapping mapping;
-  private static PipelineSelector pipelineSelector;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -69,7 +66,6 @@ public class TestNode2PipelineMap {
     mapping = (ContainerMapping)scm.getScmContainerManager();
     stateMap = mapping.getStateManager().getContainerStateMap();
     ratisContainer = mapping.allocateContainer(RATIS, THREE, "testOwner");
-    pipelineSelector = mapping.getPipelineSelector();
   }
 
   /**
@@ -117,15 +113,5 @@ public class TestNode2PipelineMap {
     NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline(
         ratisContainer.getPipeline().getPipelineName());
     Assert.assertEquals(0, set2.size());
-
-    try {
-      pipelineSelector.updatePipelineState(ratisContainer.getPipeline(),
-          HddsProtos.LifeCycleEvent.CLOSE);
-      Assert.fail("closing of pipeline without finalize should fail");
-    } catch (Exception e) {
-      Assert.assertTrue(e instanceof SCMException);
-      Assert.assertEquals(((SCMException)e).getResult(),
-          SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE);
-    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[3/3] hadoop git commit: HDDS-260. Support in Datanode for sending ContainerActions to SCM. Contributed by Nanda kumar.

Posted by xy...@apache.org.
HDDS-260. Support in Datanode for sending ContainerActions to SCM. Contributed by Nanda kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/347c9550
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/347c9550
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/347c9550

Branch: refs/heads/trunk
Commit: 347c9550135ea10fd84d5007124452bf5f2d6619
Parents: 9be25e3
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Fri Jul 20 14:37:13 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Fri Jul 20 14:37:13 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hdds/HddsConfigKeys.java  |   6 +
 .../common/src/main/resources/ozone-default.xml |  10 +
 .../common/statemachine/StateContext.java       |  55 +++-
 .../states/endpoint/HeartbeatEndpointTask.java  |  33 +-
 .../StorageContainerDatanodeProtocol.proto      |   4 +-
 .../common/report/TestReportPublisher.java      |  41 ---
 .../endpoint/TestHeartbeatEndpointTask.java     | 300 +++++++++++++++++++
 .../common/states/endpoint/package-info.java    |  18 ++
 8 files changed, 414 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
index 0283615..fd4bf08 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/HddsConfigKeys.java
@@ -48,4 +48,10 @@ public final class HddsConfigKeys {
       "hdds.command.status.report.interval";
   public static final String HDDS_COMMAND_STATUS_REPORT_INTERVAL_DEFAULT =
       "60s";
+
+  public static final String HDDS_CONTAINER_ACTION_MAX_LIMIT =
+      "hdds.container.action.max.limit";
+  public static final int HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT =
+      20;
+
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 69a382a..84a3e0c 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1098,4 +1098,14 @@
     </description>
   </property>
 
+  <property>
+    <name>hdds.container.action.max.limit</name>
+    <value>20</value>
+    <tag>DATANODE</tag>
+    <description>
+      Maximum number of Container Actions sent by the datanode to SCM in a
+      single heartbeat.
+    </description>
+  </property>
+
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index faaff69..7862cc6 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -20,14 +20,18 @@ import com.google.protobuf.GeneratedMessage;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerAction;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
 import org.apache.hadoop.ozone.container.common.states.DatanodeState;
 import org.apache.hadoop.ozone.container.common.states.datanode
     .InitDatanodeState;
 import org.apache.hadoop.ozone.container.common.states.datanode
     .RunningDatanodeState;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
-import org.apache.hadoop.ozone.protocol.commands.CommandStatus.CommandStatusBuilder;
+import org.apache.hadoop.ozone.protocol.commands.CommandStatus
+    .CommandStatusBuilder;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,6 +47,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
 
@@ -59,6 +64,7 @@ public class StateContext {
   private final AtomicLong stateExecutionCount;
   private final Configuration conf;
   private final Queue<GeneratedMessage> reports;
+  private final Queue<ContainerAction> containerActions;
   private DatanodeStateMachine.DatanodeStates state;
 
   /**
@@ -76,6 +82,7 @@ public class StateContext {
     commandQueue = new LinkedList<>();
     cmdStatusMap = new ConcurrentHashMap<>();
     reports = new LinkedList<>();
+    containerActions = new LinkedList<>();
     lock = new ReentrantLock();
     stateExecutionCount = new AtomicLong(0);
   }
@@ -187,15 +194,45 @@ public class StateContext {
    * @return List<reports>
    */
   public List<GeneratedMessage> getReports(int maxLimit) {
-    List<GeneratedMessage> results = new ArrayList<>();
     synchronized (reports) {
-      GeneratedMessage report = reports.poll();
-      while(results.size() < maxLimit && report != null) {
-        results.add(report);
-        report = reports.poll();
-      }
+      return reports.parallelStream().limit(maxLimit)
+          .collect(Collectors.toList());
+    }
+  }
+
+
+  /**
+   * Adds the ContainerAction to ContainerAction queue.
+   *
+   * @param containerAction ContainerAction to be added
+   */
+  public void addContainerAction(ContainerAction containerAction) {
+    synchronized (containerActions) {
+      containerActions.add(containerAction);
+    }
+  }
+
+  /**
+   * Returns all the pending ContainerActions from the ContainerAction queue,
+   * or empty list if the queue is empty.
+   *
+   * @return List<ContainerAction>
+   */
+  public List<ContainerAction> getAllPendingContainerActions() {
+    return getPendingContainerAction(Integer.MAX_VALUE);
+  }
+
+  /**
+   * Returns pending ContainerActions from the ContainerAction queue with a
+   * max limit on list size, or empty list if the queue is empty.
+   *
+   * @return List<ContainerAction>
+   */
+  public List<ContainerAction> getPendingContainerAction(int maxLimit) {
+    synchronized (containerActions) {
+      return containerActions.parallelStream().limit(maxLimit)
+          .collect(Collectors.toList());
     }
-    return results;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 260a245..020fb71 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -25,6 +25,10 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
 import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerActionsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerAction;
+import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
@@ -46,8 +50,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.time.ZonedDateTime;
+import java.util.List;
 import java.util.concurrent.Callable;
 
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_CONTAINER_ACTION_MAX_LIMIT;
+import static org.apache.hadoop.hdds.HddsConfigKeys
+    .HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT;
+
 /**
  * Heartbeat class for SCMs.
  */
@@ -59,6 +69,7 @@ public class HeartbeatEndpointTask
   private final Configuration conf;
   private DatanodeDetailsProto datanodeDetailsProto;
   private StateContext context;
+  private int maxContainerActionsPerHB;
 
   /**
    * Constructs a SCM heart beat.
@@ -70,6 +81,8 @@ public class HeartbeatEndpointTask
     this.rpcEndpoint = rpcEndpoint;
     this.conf = conf;
     this.context = context;
+    this.maxContainerActionsPerHB = conf.getInt(HDDS_CONTAINER_ACTION_MAX_LIMIT,
+        HDDS_CONTAINER_ACTION_MAX_LIMIT_DEFAULT);
   }
 
   /**
@@ -107,7 +120,7 @@ public class HeartbeatEndpointTask
           SCMHeartbeatRequestProto.newBuilder()
               .setDatanodeDetails(datanodeDetailsProto);
       addReports(requestBuilder);
-
+      addContainerActions(requestBuilder);
       SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
           .sendHeartbeat(requestBuilder.build());
       processResponse(reponse, datanodeDetailsProto);
@@ -140,6 +153,24 @@ public class HeartbeatEndpointTask
   }
 
   /**
+   * Adds all the pending ContainerActions to the heartbeat.
+   *
+   * @param requestBuilder builder to which the report has to be added.
+   */
+  private void addContainerActions(
+      SCMHeartbeatRequestProto.Builder requestBuilder) {
+    List<ContainerAction> actions = context.getPendingContainerAction(
+        maxContainerActionsPerHB);
+    if (!actions.isEmpty()) {
+      ContainerActionsProto cap = ContainerActionsProto.newBuilder()
+          .addAllContainerActions(actions)
+          .build();
+      requestBuilder.setContainerActions(cap);
+    }
+  }
+
+
+  /**
    * Returns a builder class for HeartbeatEndpointTask task.
    * @return   Builder.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 4238389..d89567b 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -79,8 +79,8 @@ message SCMHeartbeatRequestProto {
   required DatanodeDetailsProto datanodeDetails = 1;
   optional NodeReportProto nodeReport = 2;
   optional ContainerReportsProto containerReport = 3;
-  optional ContainerActionsProto containerActions = 4;
-  optional CommandStatusReportsProto commandStatusReport = 5;
+  optional CommandStatusReportsProto commandStatusReport = 4;
+  optional ContainerActionsProto containerActions = 5;
 }
 
 /*

http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
index a0db2e8..811599f 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/report/TestReportPublisher.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.ozone.container.common.report;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.protobuf.Descriptors;
 import com.google.protobuf.GeneratedMessage;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -28,14 +27,8 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto.
     StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
 import org.apache.hadoop.util.concurrent.HadoopExecutors;
@@ -178,22 +171,6 @@ public class TestReportPublisher {
     executorService.shutdown();
   }
 
-  @Test
-  public void testAddingReportToHeartbeat() {
-    GeneratedMessage nodeReport = NodeReportProto.getDefaultInstance();
-    GeneratedMessage containerReport = ContainerReportsProto
-        .getDefaultInstance();
-    SCMHeartbeatRequestProto.Builder heartbeatBuilder =
-        SCMHeartbeatRequestProto.newBuilder();
-    heartbeatBuilder.setDatanodeDetails(
-        getDatanodeDetails().getProtoBufMessage());
-    addReport(heartbeatBuilder, nodeReport);
-    addReport(heartbeatBuilder, containerReport);
-    SCMHeartbeatRequestProto heartbeat = heartbeatBuilder.build();
-    Assert.assertTrue(heartbeat.hasNodeReport());
-    Assert.assertTrue(heartbeat.hasContainerReport());
-  }
-
   /**
    * Get a datanode details.
    *
@@ -222,22 +199,4 @@ public class TestReportPublisher {
     return builder.build();
   }
 
-  /**
-   * Adds the report to heartbeat.
-   *
-   * @param requestBuilder builder to which the report has to be added.
-   * @param report         the report to be added.
-   */
-  private static void addReport(SCMHeartbeatRequestProto.Builder
-      requestBuilder, GeneratedMessage report) {
-    String reportName = report.getDescriptorForType().getFullName();
-    for (Descriptors.FieldDescriptor descriptor :
-        SCMHeartbeatRequestProto.getDescriptor().getFields()) {
-      String heartbeatFieldName = descriptor.getMessageType().getFullName();
-      if (heartbeatFieldName.equals(reportName)) {
-        requestBuilder.setField(descriptor, report);
-      }
-    }
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
new file mode 100644
index 0000000..b4d718d
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -0,0 +1,300 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.states.endpoint;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerInfo;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerAction;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .DatanodeStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .DatanodeStateMachine.DatanodeStates;
+import org.apache.hadoop.ozone.container.common.statemachine
+    .EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.protocolPB
+    .StorageContainerDatanodeProtocolClientSideTranslatorPB;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mockito;
+
+import java.util.UUID;
+
+/**
+ * This class tests the functionality of HeartbeatEndpointTask.
+ */
+public class TestHeartbeatEndpointTask {
+
+
+  @Test
+  public void testheartbeatWithoutReports() throws Exception {
+    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+        Mockito.mock(
+            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+        .forClass(SCMHeartbeatRequestProto.class);
+    Mockito.when(scm.sendHeartbeat(argument.capture()))
+        .thenAnswer(invocation ->
+            SCMHeartbeatResponseProto.newBuilder()
+                .setDatanodeUUID(
+                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+                        .getDatanodeDetails().getUuid())
+                .build());
+
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(scm);
+    endpointTask.call();
+    SCMHeartbeatRequestProto heartbeat = argument.getValue();
+    Assert.assertTrue(heartbeat.hasDatanodeDetails());
+    Assert.assertFalse(heartbeat.hasNodeReport());
+    Assert.assertFalse(heartbeat.hasContainerReport());
+    Assert.assertFalse(heartbeat.hasCommandStatusReport());
+    Assert.assertFalse(heartbeat.hasContainerActions());
+  }
+
+  @Test
+  public void testheartbeatWithNodeReports() throws Exception {
+    Configuration conf = new OzoneConfiguration();
+    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+        Mockito.mock(DatanodeStateMachine.class));
+
+    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+        Mockito.mock(
+            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+        .forClass(SCMHeartbeatRequestProto.class);
+    Mockito.when(scm.sendHeartbeat(argument.capture()))
+        .thenAnswer(invocation ->
+            SCMHeartbeatResponseProto.newBuilder()
+                .setDatanodeUUID(
+                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+                        .getDatanodeDetails().getUuid())
+                .build());
+
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+        conf, context, scm);
+    context.addReport(NodeReportProto.getDefaultInstance());
+    endpointTask.call();
+    SCMHeartbeatRequestProto heartbeat = argument.getValue();
+    Assert.assertTrue(heartbeat.hasDatanodeDetails());
+    Assert.assertTrue(heartbeat.hasNodeReport());
+    Assert.assertFalse(heartbeat.hasContainerReport());
+    Assert.assertFalse(heartbeat.hasCommandStatusReport());
+    Assert.assertFalse(heartbeat.hasContainerActions());
+  }
+
+  @Test
+  public void testheartbeatWithContainerReports() throws Exception {
+    Configuration conf = new OzoneConfiguration();
+    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+        Mockito.mock(DatanodeStateMachine.class));
+
+    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+        Mockito.mock(
+            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+        .forClass(SCMHeartbeatRequestProto.class);
+    Mockito.when(scm.sendHeartbeat(argument.capture()))
+        .thenAnswer(invocation ->
+            SCMHeartbeatResponseProto.newBuilder()
+                .setDatanodeUUID(
+                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+                        .getDatanodeDetails().getUuid())
+                .build());
+
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+        conf, context, scm);
+    context.addReport(ContainerReportsProto.getDefaultInstance());
+    endpointTask.call();
+    SCMHeartbeatRequestProto heartbeat = argument.getValue();
+    Assert.assertTrue(heartbeat.hasDatanodeDetails());
+    Assert.assertFalse(heartbeat.hasNodeReport());
+    Assert.assertTrue(heartbeat.hasContainerReport());
+    Assert.assertFalse(heartbeat.hasCommandStatusReport());
+    Assert.assertFalse(heartbeat.hasContainerActions());
+  }
+
+  @Test
+  public void testheartbeatWithCommandStatusReports() throws Exception {
+    Configuration conf = new OzoneConfiguration();
+    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+        Mockito.mock(DatanodeStateMachine.class));
+
+    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+        Mockito.mock(
+            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+        .forClass(SCMHeartbeatRequestProto.class);
+    Mockito.when(scm.sendHeartbeat(argument.capture()))
+        .thenAnswer(invocation ->
+            SCMHeartbeatResponseProto.newBuilder()
+                .setDatanodeUUID(
+                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+                        .getDatanodeDetails().getUuid())
+                .build());
+
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+        conf, context, scm);
+    context.addReport(CommandStatusReportsProto.getDefaultInstance());
+    endpointTask.call();
+    SCMHeartbeatRequestProto heartbeat = argument.getValue();
+    Assert.assertTrue(heartbeat.hasDatanodeDetails());
+    Assert.assertFalse(heartbeat.hasNodeReport());
+    Assert.assertFalse(heartbeat.hasContainerReport());
+    Assert.assertTrue(heartbeat.hasCommandStatusReport());
+    Assert.assertFalse(heartbeat.hasContainerActions());
+  }
+
+  @Test
+  public void testheartbeatWithContainerActions() throws Exception {
+    Configuration conf = new OzoneConfiguration();
+    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+        Mockito.mock(DatanodeStateMachine.class));
+
+    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+        Mockito.mock(
+            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+        .forClass(SCMHeartbeatRequestProto.class);
+    Mockito.when(scm.sendHeartbeat(argument.capture()))
+        .thenAnswer(invocation ->
+            SCMHeartbeatResponseProto.newBuilder()
+                .setDatanodeUUID(
+                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+                        .getDatanodeDetails().getUuid())
+                .build());
+
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+        conf, context, scm);
+    context.addContainerAction(getContainerAction());
+    endpointTask.call();
+    SCMHeartbeatRequestProto heartbeat = argument.getValue();
+    Assert.assertTrue(heartbeat.hasDatanodeDetails());
+    Assert.assertFalse(heartbeat.hasNodeReport());
+    Assert.assertFalse(heartbeat.hasContainerReport());
+    Assert.assertFalse(heartbeat.hasCommandStatusReport());
+    Assert.assertTrue(heartbeat.hasContainerActions());
+  }
+
+  @Test
+  public void testheartbeatWithAllReports() throws Exception {
+    Configuration conf = new OzoneConfiguration();
+    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+        Mockito.mock(DatanodeStateMachine.class));
+
+    StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
+        Mockito.mock(
+            StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+    ArgumentCaptor<SCMHeartbeatRequestProto> argument = ArgumentCaptor
+        .forClass(SCMHeartbeatRequestProto.class);
+    Mockito.when(scm.sendHeartbeat(argument.capture()))
+        .thenAnswer(invocation ->
+            SCMHeartbeatResponseProto.newBuilder()
+                .setDatanodeUUID(
+                    ((SCMHeartbeatRequestProto)invocation.getArgument(0))
+                        .getDatanodeDetails().getUuid())
+                .build());
+
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
+        conf, context, scm);
+    context.addReport(NodeReportProto.getDefaultInstance());
+    context.addReport(ContainerReportsProto.getDefaultInstance());
+    context.addReport(CommandStatusReportsProto.getDefaultInstance());
+    context.addContainerAction(getContainerAction());
+    endpointTask.call();
+    SCMHeartbeatRequestProto heartbeat = argument.getValue();
+    Assert.assertTrue(heartbeat.hasDatanodeDetails());
+    Assert.assertTrue(heartbeat.hasNodeReport());
+    Assert.assertTrue(heartbeat.hasContainerReport());
+    Assert.assertTrue(heartbeat.hasCommandStatusReport());
+    Assert.assertTrue(heartbeat.hasContainerActions());
+  }
+
+  /**
+   * Creates HeartbeatEndpointTask for the given StorageContainerManager proxy.
+   *
+   * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
+   *
+   * @return HeartbeatEndpointTask
+   */
+  private HeartbeatEndpointTask getHeartbeatEndpointTask(
+      StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
+    Configuration conf = new OzoneConfiguration();
+    StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
+        Mockito.mock(DatanodeStateMachine.class));
+    return getHeartbeatEndpointTask(conf, context, proxy);
+
+  }
+
+  /**
+   * Creates HeartbeatEndpointTask with the given conf, context and
+   * StorageContainerManager client side proxy.
+   *
+   * @param conf Configuration
+   * @param context StateContext
+   * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
+   *
+   * @return HeartbeatEndpointTask
+   */
+  private HeartbeatEndpointTask getHeartbeatEndpointTask(
+      Configuration conf,
+      StateContext context,
+      StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
+    DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder()
+        .setUuid(UUID.randomUUID().toString())
+        .setHostName("localhost")
+        .setIpAddress("127.0.0.1")
+        .build();
+    EndpointStateMachine endpointStateMachine = Mockito
+        .mock(EndpointStateMachine.class);
+    Mockito.when(endpointStateMachine.getEndPoint()).thenReturn(proxy);
+    return HeartbeatEndpointTask.newBuilder()
+        .setConfig(conf)
+        .setDatanodeDetails(datanodeDetails)
+        .setContext(context)
+        .setEndpointStateMachine(endpointStateMachine)
+        .build();
+  }
+
+  private ContainerAction getContainerAction() {
+    ContainerAction.Builder builder = ContainerAction.newBuilder();
+    ContainerInfo containerInfo = ContainerInfo.newBuilder()
+        .setContainerID(1L)
+        .build();
+    builder.setContainer(containerInfo)
+        .setAction(ContainerAction.Action.CLOSE)
+        .setReason(ContainerAction.Reason.CONTAINER_FULL);
+    return builder.build();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/347c9550/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
new file mode 100644
index 0000000..d120a5c
--- /dev/null
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone.container.common.states.endpoint;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/3] hadoop git commit: HDDS-239. Add PipelineStateManager to track pipeline state transition. Contributed by Mukul Kumar Singh.

Posted by xy...@apache.org.
HDDS-239. Add PipelineStateManager to track pipeline state transition. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9be25e34
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9be25e34
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9be25e34

Branch: refs/heads/trunk
Commit: 9be25e347683d26e0575458c7f470c76fd4d951b
Parents: d2acf8d
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Fri Jul 20 14:22:02 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Fri Jul 20 14:22:02 2018 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hdds/scm/ScmConfigKeys.java   |   5 +
 .../scm/container/common/helpers/Pipeline.java  |   7 +
 .../common/src/main/resources/ozone-default.xml |  12 ++
 .../hdds/scm/container/ContainerMapping.java    |   4 +
 .../hdds/scm/exceptions/SCMException.java       |   1 +
 .../hdds/scm/pipelines/PipelineManager.java     |  64 +++---
 .../hdds/scm/pipelines/PipelineSelector.java    | 212 ++++++++++++++++---
 .../scm/pipelines/ratis/RatisManagerImpl.java   |  33 +--
 .../standalone/StandaloneManagerImpl.java       |  21 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java |  14 ++
 10 files changed, 273 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 71184cf..6e940ad 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -236,6 +236,11 @@ public final class ScmConfigKeys {
   public static final String
       OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
 
+  public static final String OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT =
+      "ozone.scm.pipeline.creation.lease.timeout";
+
+  public static final String
+      OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
 
   public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
       "ozone.scm.block.deletion.max.retry";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index c5794f4..534c9fd 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -214,6 +214,13 @@ public class Pipeline {
   }
 
   /**
+   * Update the State of the pipeline.
+   */
+  public void setLifeCycleState(HddsProtos.LifeCycleState nextState) {
+     lifeCycleState = nextState;
+  }
+
+  /**
    * Gets the pipeline Name.
    *
    * @return - Name of the pipeline

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 5a1d26a..69a382a 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1085,5 +1085,17 @@
       executed since last report. Unit could be defined with
       postfix (ns,ms,s,m,h,d)</description>
   </property>
+  <property>
+    <name>ozone.scm.pipeline.creation.lease.timeout</name>
+    <value>60s</value>
+    <tag>OZONE, SCM, PIPELINE</tag>
+    <description>
+      Pipeline creation timeout in milliseconds to be used by SCM. When
+      BEGIN_CREATE event happens the pipeline is moved from ALLOCATED to
+      CREATING state, SCM will now wait for the configured amount of time
+      to get COMPLETE_CREATE event if it doesn't receive it will move the
+      pipeline to DELETING.
+    </description>
+  </property>
 
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 26f4d86..f07d22b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -658,6 +658,10 @@ public class ContainerMapping implements Mapping {
     if (containerStore != null) {
       containerStore.close();
     }
+
+    if (pipelineSelector != null) {
+      pipelineSelector.shutdown();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
index d7d70ef..0085542 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -107,6 +107,7 @@ public class SCMException extends IOException {
     FAILED_TO_LOAD_OPEN_CONTAINER,
     FAILED_TO_ALLOCATE_CONTAINER,
     FAILED_TO_CHANGE_CONTAINER_STATE,
+    FAILED_TO_CHANGE_PIPELINE_STATE,
     CONTAINER_EXISTS,
     FAILED_TO_FIND_CONTAINER,
     FAILED_TO_FIND_CONTAINER_WITH_SPACE,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index a041973..77d8211 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -59,41 +59,16 @@ public abstract class PipelineManager {
    * @return a Pipeline.
    */
   public synchronized final Pipeline getPipeline(
-      ReplicationFactor replicationFactor, ReplicationType replicationType)
-      throws IOException {
-    /**
-     * In the Ozone world, we have a very simple policy.
-     *
-     * 1. Try to create a pipeline if there are enough free nodes.
-     *
-     * 2. This allows all nodes to part of a pipeline quickly.
-     *
-     * 3. if there are not enough free nodes, return pipeline in a
-     * round-robin fashion.
-     *
-     * TODO: Might have to come up with a better algorithm than this.
-     * Create a new placement policy that returns pipelines in round robin
-     * fashion.
-     */
-    Pipeline pipeline = allocatePipeline(replicationFactor);
+      ReplicationFactor replicationFactor, ReplicationType replicationType) {
+    Pipeline pipeline = findOpenPipeline(replicationType, replicationFactor);
     if (pipeline != null) {
-      LOG.debug("created new pipeline:{} for container with " +
+      LOG.debug("re-used pipeline:{} for container with " +
               "replicationType:{} replicationFactor:{}",
           pipeline.getPipelineName(), replicationType, replicationFactor);
-      activePipelines.add(pipeline);
-      activePipelineMap.put(pipeline.getPipelineName(), pipeline);
-      node2PipelineMap.addPipeline(pipeline);
-    } else {
-      pipeline = findOpenPipeline(replicationType, replicationFactor);
-      if (pipeline != null) {
-        LOG.debug("re-used pipeline:{} for container with " +
-                "replicationType:{} replicationFactor:{}",
-            pipeline.getPipelineName(), replicationType, replicationFactor);
-      }
     }
     if (pipeline == null) {
       LOG.error("Get pipeline call failed. We are not able to find" +
-              "free nodes or operational pipeline.");
+              " operational pipeline.");
       return null;
     } else {
       return pipeline;
@@ -109,7 +84,7 @@ public abstract class PipelineManager {
   public synchronized final Pipeline getPipeline(String pipelineName) {
     Pipeline pipeline = null;
 
-    // 1. Check if pipeline channel already exists
+    // 1. Check if pipeline already exists
     if (activePipelineMap.containsKey(pipelineName)) {
       pipeline = activePipelineMap.get(pipelineName);
       LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
@@ -132,7 +107,13 @@ public abstract class PipelineManager {
   }
 
   public abstract Pipeline allocatePipeline(
-      ReplicationFactor replicationFactor) throws IOException;
+      ReplicationFactor replicationFactor);
+
+  /**
+   * Initialize the pipeline
+   * TODO: move the initialization to Ozone Client later
+   */
+  public abstract void initializePipeline(Pipeline pipeline) throws IOException;
 
   public void removePipeline(Pipeline pipeline) {
     activePipelines.remove(pipeline);
@@ -179,12 +160,23 @@ public abstract class PipelineManager {
   }
 
   /**
-   * Creates a pipeline from a specified set of Nodes.
-   * @param pipelineID - Name of the pipeline
-   * @param datanodes - The list of datanodes that make this pipeline.
+   * Creates a pipeline with a specified replication factor and type.
+   * @param replicationFactor - Replication Factor.
+   * @param replicationType - Replication Type.
    */
-  public abstract void createPipeline(String pipelineID,
-      List<DatanodeDetails> datanodes) throws IOException;
+  public Pipeline createPipeline(ReplicationFactor replicationFactor,
+      ReplicationType replicationType) throws IOException {
+    Pipeline pipeline = allocatePipeline(replicationFactor);
+    if (pipeline != null) {
+      LOG.debug("created new pipeline:{} for container with "
+              + "replicationType:{} replicationFactor:{}",
+          pipeline.getPipelineName(), replicationType, replicationFactor);
+      activePipelines.add(pipeline);
+      activePipelineMap.put(pipeline.getPipelineName(), pipeline);
+      node2PipelineMap.addPipeline(pipeline);
+    }
+    return pipeline;
+  }
 
   /**
    * Close the  pipeline with the given clusterId.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index 2955af5..08710e7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
 import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
@@ -33,17 +34,28 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.statemachine
+    .InvalidStateTransitionException;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.apache.hadoop.ozone.lease.Lease;
+import org.apache.hadoop.ozone.lease.LeaseException;
+import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_CHANGE_PIPELINE_STATE;
+
 /**
  * Sends the request to the right pipeline manager.
  */
@@ -57,6 +69,10 @@ public class PipelineSelector {
   private final StandaloneManagerImpl standaloneManager;
   private final long containerSize;
   private final Node2PipelineMap node2PipelineMap;
+  private final LeaseManager<Pipeline> pipelineLeaseManager;
+  private final StateMachine<LifeCycleState,
+      HddsProtos.LifeCycleEvent> stateMachine;
+
   /**
    * Constructs a pipeline Selector.
    *
@@ -77,6 +93,74 @@ public class PipelineSelector {
     this.ratisManager =
         new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
             conf, node2PipelineMap);
+    // Initialize the container state machine.
+    Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
+    long pipelineCreationLeaseTimeout = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
+        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    LOG.trace("Starting Pipeline Lease Manager.");
+    pipelineLeaseManager = new LeaseManager<>(pipelineCreationLeaseTimeout);
+    pipelineLeaseManager.start();
+
+    // These are the steady states of a container.
+    finalStates.add(HddsProtos.LifeCycleState.OPEN);
+    finalStates.add(HddsProtos.LifeCycleState.CLOSED);
+
+    this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
+        finalStates);
+    initializeStateMachine();
+  }
+
+  /**
+   * Event and State Transition Mapping:
+   *
+   * State: ALLOCATED ---------------> CREATING
+   * Event:                CREATE
+   *
+   * State: CREATING  ---------------> OPEN
+   * Event:               CREATED
+   *
+   * State: OPEN      ---------------> CLOSING
+   * Event:               FINALIZE
+   *
+   * State: CLOSING   ---------------> CLOSED
+   * Event:                CLOSE
+   *
+   * State: CREATING  ---------------> CLOSED
+   * Event:               TIMEOUT
+   *
+   *
+   * Container State Flow:
+   *
+   * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
+   *            (CREATE)     | (CREATED)     (FINALIZE)   |
+   *                         |                            |
+   *                         |                            |
+   *                         |(TIMEOUT)                   |(CLOSE)
+   *                         |                            |
+   *                         +--------> [CLOSED] <--------+
+   */
+  private void initializeStateMachine() {
+    stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
+        HddsProtos.LifeCycleState.CREATING,
+        HddsProtos.LifeCycleEvent.CREATE);
+
+    stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
+        HddsProtos.LifeCycleState.OPEN,
+        HddsProtos.LifeCycleEvent.CREATED);
+
+    stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
+        HddsProtos.LifeCycleState.CLOSING,
+        HddsProtos.LifeCycleEvent.FINALIZE);
+
+    stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
+        HddsProtos.LifeCycleState.CLOSED,
+        HddsProtos.LifeCycleEvent.CLOSE);
+
+    stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
+        HddsProtos.LifeCycleState.CLOSED,
+        HddsProtos.LifeCycleEvent.TIMEOUT);
   }
 
   /**
@@ -88,15 +172,14 @@ public class PipelineSelector {
    * @return pipeline corresponding to nodes
    */
   public static Pipeline newPipelineFromNodes(
-      List<DatanodeDetails> nodes, LifeCycleState state,
-      ReplicationType replicationType, ReplicationFactor replicationFactor,
-      String name) {
+      List<DatanodeDetails> nodes, ReplicationType replicationType,
+      ReplicationFactor replicationFactor, String name) {
     Preconditions.checkNotNull(nodes);
     Preconditions.checkArgument(nodes.size() > 0);
     String leaderId = nodes.get(0).getUuidString();
-    Pipeline
-        pipeline = new Pipeline(leaderId, state, replicationType,
-        replicationFactor, name);
+    // A new pipeline always starts in allocated state
+    Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED,
+        replicationType, replicationFactor, name);
     for (DatanodeDetails node : nodes) {
       pipeline.addMember(node);
     }
@@ -175,8 +258,35 @@ public class PipelineSelector {
     LOG.debug("Getting replication pipeline forReplicationType {} :" +
             " ReplicationFactor {}", replicationType.toString(),
         replicationFactor.toString());
-    return manager.
-        getPipeline(replicationFactor, replicationType);
+
+    /**
+     * In the Ozone world, we have a very simple policy.
+     *
+     * 1. Try to create a pipeline if there are enough free nodes.
+     *
+     * 2. This allows all nodes to part of a pipeline quickly.
+     *
+     * 3. if there are not enough free nodes, return already allocated pipeline
+     * in a round-robin fashion.
+     *
+     * TODO: Might have to come up with a better algorithm than this.
+     * Create a new placement policy that returns pipelines in round robin
+     * fashion.
+     */
+    Pipeline pipeline =
+        manager.createPipeline(replicationFactor, replicationType);
+    if (pipeline == null) {
+      // try to return a pipeline from already allocated pipelines
+      pipeline = manager.getPipeline(replicationFactor, replicationType);
+    } else {
+      // if a new pipeline is created, initialize its state machine
+      updatePipelineState(pipeline,HddsProtos.LifeCycleEvent.CREATE);
+
+      //TODO: move the initialization of pipeline to Ozone Client
+      manager.initializePipeline(pipeline);
+      updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATED);
+    }
+    return pipeline;
   }
 
   /**
@@ -194,19 +304,6 @@ public class PipelineSelector {
         " pipelineName:{}", replicationType, pipelineName);
     return manager.getPipeline(pipelineName);
   }
-  /**
-   * Creates a pipeline from a specified set of Nodes.
-   */
-
-  public void createPipeline(ReplicationType replicationType, String
-      pipelineID, List<DatanodeDetails> datanodes) throws IOException {
-    PipelineManager manager = getPipelineManager(replicationType);
-    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    LOG.debug("Creating a pipeline: {} with nodes:{}", pipelineID,
-        datanodes.stream().map(DatanodeDetails::toString)
-            .collect(Collectors.joining(",")));
-    manager.createPipeline(pipelineID, datanodes);
-  }
 
   /**
    * Close the  pipeline with the given clusterId.
@@ -251,12 +348,77 @@ public class PipelineSelector {
   }
 
   public void removePipeline(UUID dnId) {
-    Set<Pipeline> pipelineChannelSet =
+    Set<Pipeline> pipelineSet =
         node2PipelineMap.getPipelines(dnId);
-    for (Pipeline pipelineChannel : pipelineChannelSet) {
-      getPipelineManager(pipelineChannel.getType())
-          .removePipeline(pipelineChannel);
+    for (Pipeline pipeline : pipelineSet) {
+      getPipelineManager(pipeline.getType())
+          .removePipeline(pipeline);
     }
     node2PipelineMap.removeDatanode(dnId);
   }
+
+  /**
+   * Update the Pipeline State to the next state.
+   *
+   * @param pipeline - Pipeline
+   * @param event - LifeCycle Event
+   * @throws SCMException  on Failure.
+   */
+  public void updatePipelineState(Pipeline pipeline,
+      HddsProtos.LifeCycleEvent event) throws IOException {
+    HddsProtos.LifeCycleState newState;
+    try {
+      newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
+    } catch (InvalidStateTransitionException ex) {
+      String error = String.format("Failed to update pipeline state %s, " +
+              "reason: invalid state transition from state: %s upon " +
+              "event: %s.",
+          pipeline.getPipelineName(), pipeline.getLifeCycleState(), event);
+      LOG.error(error);
+      throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
+    }
+
+    // This is a post condition after executing getNextState.
+    Preconditions.checkNotNull(newState);
+    Preconditions.checkNotNull(pipeline);
+    try {
+      switch (event) {
+      case CREATE:
+        // Acquire lease on pipeline
+        Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline);
+        // Register callback to be executed in case of timeout
+        pipelineLease.registerCallBack(() -> {
+          updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.TIMEOUT);
+          return null;
+        });
+        break;
+      case CREATED:
+        // Release the lease on pipeline
+        pipelineLeaseManager.release(pipeline);
+        break;
+
+      case FINALIZE:
+        //TODO: cleanup pipeline by closing all the containers on the pipeline
+        break;
+
+      case CLOSE:
+      case TIMEOUT:
+        // TODO: Release the nodes here when pipelines are destroyed
+        break;
+      default:
+        throw new SCMException("Unsupported pipeline LifeCycleEvent.",
+            FAILED_TO_CHANGE_PIPELINE_STATE);
+      }
+
+      pipeline.setLifeCycleState(newState);
+    } catch (LeaseException e) {
+      throw new IOException("Lease Exception.", e);
+    }
+  }
+
+  public void shutdown() {
+    if (pipelineLeaseManager != null) {
+      pipelineLeaseManager.shutdown();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index a8f8b20..c726ef6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -72,7 +71,7 @@ public class RatisManagerImpl extends PipelineManager {
    * Allocates a new ratis Pipeline from the free nodes.
    *
    * @param factor - One or Three
-   * @return PipelineChannel.
+   * @return Pipeline.
    */
   public Pipeline allocatePipeline(ReplicationFactor factor) {
     List<DatanodeDetails> newNodesList = new LinkedList<>();
@@ -89,35 +88,23 @@ public class RatisManagerImpl extends PipelineManager {
           // further allocations
           ratisMembers.addAll(newNodesList);
           LOG.info("Allocating a new ratis pipeline of size: {}", count);
-          // Start all channel names with "Ratis", easy to grep the logs.
+          // Start all pipeline names with "Ratis", easy to grep the logs.
           String pipelineName = PREFIX +
               UUID.randomUUID().toString().substring(PREFIX.length());
-          Pipeline pipeline=
-              PipelineSelector.newPipelineFromNodes(newNodesList,
-              LifeCycleState.OPEN, ReplicationType.RATIS, factor, pipelineName);
-          try (XceiverClientRatis client =
-              XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
-            client.createPipeline(pipeline.getPipelineName(), newNodesList);
-          } catch (IOException e) {
-            return null;
-          }
-          return pipeline;
+          return PipelineSelector.newPipelineFromNodes(newNodesList,
+              ReplicationType.RATIS, factor, pipelineName);
         }
       }
     }
     return null;
   }
 
-  /**
-   * Creates a pipeline from a specified set of Nodes.
-   *
-   * @param pipelineID - Name of the pipeline
-   * @param datanodes - The list of datanodes that make this pipeline.
-   */
-  @Override
-  public void createPipeline(String pipelineID,
-                             List<DatanodeDetails> datanodes) {
-
+  public void initializePipeline(Pipeline pipeline) throws IOException {
+    //TODO:move the initialization from SCM to client
+    try (XceiverClientRatis client =
+        XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
+      client.createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index cf691bf..bb4951f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -86,29 +85,19 @@ public class StandaloneManagerImpl extends PipelineManager {
           // once a datanode has been added to a pipeline, exclude it from
           // further allocations
           standAloneMembers.addAll(newNodesList);
-          LOG.info("Allocating a new standalone pipeline channel of size: {}",
-              count);
-          String channelName =
+          LOG.info("Allocating a new standalone pipeline of size: {}", count);
+          String pipelineName =
               "SA-" + UUID.randomUUID().toString().substring(3);
           return PipelineSelector.newPipelineFromNodes(newNodesList,
-              LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
-              ReplicationFactor.ONE, channelName);
+              ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
         }
       }
     }
     return null;
   }
 
-  /**
-   * Creates a pipeline from a specified set of Nodes.
-   *
-   * @param pipelineID - Name of the pipeline
-   * @param datanodes - The list of datanodes that make this pipeline.
-   */
-  @Override
-  public void createPipeline(String pipelineID,
-                             List<DatanodeDetails> datanodes) {
-    //return newPipelineFromNodes(datanodes, pipelineID);
+  public void initializePipeline(Pipeline pipeline) {
+    // Nothing to be done for standalone pipeline
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index bc3505f..ffac6d5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers
     .ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.junit.AfterClass;
@@ -51,6 +53,7 @@ public class TestNode2PipelineMap {
   private static ContainerWithPipeline ratisContainer;
   private static ContainerStateMap stateMap;
   private static ContainerMapping mapping;
+  private static PipelineSelector pipelineSelector;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -66,6 +69,7 @@ public class TestNode2PipelineMap {
     mapping = (ContainerMapping)scm.getScmContainerManager();
     stateMap = mapping.getStateManager().getContainerStateMap();
     ratisContainer = mapping.allocateContainer(RATIS, THREE, "testOwner");
+    pipelineSelector = mapping.getPipelineSelector();
   }
 
   /**
@@ -113,5 +117,15 @@ public class TestNode2PipelineMap {
     NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline(
         ratisContainer.getPipeline().getPipelineName());
     Assert.assertEquals(0, set2.size());
+
+    try {
+      pipelineSelector.updatePipelineState(ratisContainer.getPipeline(),
+          HddsProtos.LifeCycleEvent.CLOSE);
+      Assert.fail("closing of pipeline without finalize should fail");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof SCMException);
+      Assert.assertEquals(((SCMException)e).getResult(),
+          SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE);
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org