You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/07/20 21:38:13 UTC

[2/3] hadoop git commit: HDDS-239. Add PipelineStateManager to track pipeline state transition. Contributed by Mukul Kumar Singh.

HDDS-239. Add PipelineStateManager to track pipeline state transition. Contributed by Mukul Kumar Singh.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9be25e34
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9be25e34
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9be25e34

Branch: refs/heads/trunk
Commit: 9be25e347683d26e0575458c7f470c76fd4d951b
Parents: d2acf8d
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Fri Jul 20 14:22:02 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Fri Jul 20 14:22:02 2018 -0700

----------------------------------------------------------------------
 .../apache/hadoop/hdds/scm/ScmConfigKeys.java   |   5 +
 .../scm/container/common/helpers/Pipeline.java  |   7 +
 .../common/src/main/resources/ozone-default.xml |  12 ++
 .../hdds/scm/container/ContainerMapping.java    |   4 +
 .../hdds/scm/exceptions/SCMException.java       |   1 +
 .../hdds/scm/pipelines/PipelineManager.java     |  64 +++---
 .../hdds/scm/pipelines/PipelineSelector.java    | 212 ++++++++++++++++---
 .../scm/pipelines/ratis/RatisManagerImpl.java   |  33 +--
 .../standalone/StandaloneManagerImpl.java       |  21 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java |  14 ++
 10 files changed, 273 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 71184cf..6e940ad 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -236,6 +236,11 @@ public final class ScmConfigKeys {
   public static final String
       OZONE_SCM_CONTAINER_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
 
+  public static final String OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT =
+      "ozone.scm.pipeline.creation.lease.timeout";
+
+  public static final String
+      OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT = "60s";
 
   public static final String OZONE_SCM_BLOCK_DELETION_MAX_RETRY =
       "ozone.scm.block.deletion.max.retry";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index c5794f4..534c9fd 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -214,6 +214,13 @@ public class Pipeline {
   }
 
   /**
+   * Update the State of the pipeline.
+   */
+  public void setLifeCycleState(HddsProtos.LifeCycleState nextState) {
+     lifeCycleState = nextState;
+  }
+
+  /**
    * Gets the pipeline Name.
    *
    * @return - Name of the pipeline

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 5a1d26a..69a382a 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -1085,5 +1085,17 @@
       executed since last report. Unit could be defined with
       postfix (ns,ms,s,m,h,d)</description>
   </property>
+  <property>
+    <name>ozone.scm.pipeline.creation.lease.timeout</name>
+    <value>60s</value>
+    <tag>OZONE, SCM, PIPELINE</tag>
+    <description>
+      Pipeline creation timeout in milliseconds to be used by SCM. When
+      BEGIN_CREATE event happens the pipeline is moved from ALLOCATED to
+      CREATING state, SCM will now wait for the configured amount of time
+      to get COMPLETE_CREATE event if it doesn't receive it will move the
+      pipeline to DELETING.
+    </description>
+  </property>
 
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 26f4d86..f07d22b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -658,6 +658,10 @@ public class ContainerMapping implements Mapping {
     if (containerStore != null) {
       containerStore.close();
     }
+
+    if (pipelineSelector != null) {
+      pipelineSelector.shutdown();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
index d7d70ef..0085542 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java
@@ -107,6 +107,7 @@ public class SCMException extends IOException {
     FAILED_TO_LOAD_OPEN_CONTAINER,
     FAILED_TO_ALLOCATE_CONTAINER,
     FAILED_TO_CHANGE_CONTAINER_STATE,
+    FAILED_TO_CHANGE_PIPELINE_STATE,
     CONTAINER_EXISTS,
     FAILED_TO_FIND_CONTAINER,
     FAILED_TO_FIND_CONTAINER_WITH_SPACE,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index a041973..77d8211 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -59,41 +59,16 @@ public abstract class PipelineManager {
    * @return a Pipeline.
    */
   public synchronized final Pipeline getPipeline(
-      ReplicationFactor replicationFactor, ReplicationType replicationType)
-      throws IOException {
-    /**
-     * In the Ozone world, we have a very simple policy.
-     *
-     * 1. Try to create a pipeline if there are enough free nodes.
-     *
-     * 2. This allows all nodes to part of a pipeline quickly.
-     *
-     * 3. if there are not enough free nodes, return pipeline in a
-     * round-robin fashion.
-     *
-     * TODO: Might have to come up with a better algorithm than this.
-     * Create a new placement policy that returns pipelines in round robin
-     * fashion.
-     */
-    Pipeline pipeline = allocatePipeline(replicationFactor);
+      ReplicationFactor replicationFactor, ReplicationType replicationType) {
+    Pipeline pipeline = findOpenPipeline(replicationType, replicationFactor);
     if (pipeline != null) {
-      LOG.debug("created new pipeline:{} for container with " +
+      LOG.debug("re-used pipeline:{} for container with " +
               "replicationType:{} replicationFactor:{}",
           pipeline.getPipelineName(), replicationType, replicationFactor);
-      activePipelines.add(pipeline);
-      activePipelineMap.put(pipeline.getPipelineName(), pipeline);
-      node2PipelineMap.addPipeline(pipeline);
-    } else {
-      pipeline = findOpenPipeline(replicationType, replicationFactor);
-      if (pipeline != null) {
-        LOG.debug("re-used pipeline:{} for container with " +
-                "replicationType:{} replicationFactor:{}",
-            pipeline.getPipelineName(), replicationType, replicationFactor);
-      }
     }
     if (pipeline == null) {
       LOG.error("Get pipeline call failed. We are not able to find" +
-              "free nodes or operational pipeline.");
+              " operational pipeline.");
       return null;
     } else {
       return pipeline;
@@ -109,7 +84,7 @@ public abstract class PipelineManager {
   public synchronized final Pipeline getPipeline(String pipelineName) {
     Pipeline pipeline = null;
 
-    // 1. Check if pipeline channel already exists
+    // 1. Check if pipeline already exists
     if (activePipelineMap.containsKey(pipelineName)) {
       pipeline = activePipelineMap.get(pipelineName);
       LOG.debug("Returning pipeline for pipelineName:{}", pipelineName);
@@ -132,7 +107,13 @@ public abstract class PipelineManager {
   }
 
   public abstract Pipeline allocatePipeline(
-      ReplicationFactor replicationFactor) throws IOException;
+      ReplicationFactor replicationFactor);
+
+  /**
+   * Initialize the pipeline
+   * TODO: move the initialization to Ozone Client later
+   */
+  public abstract void initializePipeline(Pipeline pipeline) throws IOException;
 
   public void removePipeline(Pipeline pipeline) {
     activePipelines.remove(pipeline);
@@ -179,12 +160,23 @@ public abstract class PipelineManager {
   }
 
   /**
-   * Creates a pipeline from a specified set of Nodes.
-   * @param pipelineID - Name of the pipeline
-   * @param datanodes - The list of datanodes that make this pipeline.
+   * Creates a pipeline with a specified replication factor and type.
+   * @param replicationFactor - Replication Factor.
+   * @param replicationType - Replication Type.
    */
-  public abstract void createPipeline(String pipelineID,
-      List<DatanodeDetails> datanodes) throws IOException;
+  public Pipeline createPipeline(ReplicationFactor replicationFactor,
+      ReplicationType replicationType) throws IOException {
+    Pipeline pipeline = allocatePipeline(replicationFactor);
+    if (pipeline != null) {
+      LOG.debug("created new pipeline:{} for container with "
+              + "replicationType:{} replicationFactor:{}",
+          pipeline.getPipelineName(), replicationType, replicationFactor);
+      activePipelines.add(pipeline);
+      activePipelineMap.put(pipeline.getPipelineName(), pipeline);
+      node2PipelineMap.addPipeline(pipeline);
+    }
+    return pipeline;
+  }
 
   /**
    * Close the  pipeline with the given clusterId.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index 2955af5..08710e7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .ContainerPlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms
     .SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipelines.ratis.RatisManagerImpl;
 import org.apache.hadoop.hdds.scm.pipelines.standalone.StandaloneManagerImpl;
@@ -33,17 +34,28 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.statemachine
+    .InvalidStateTransitionException;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.apache.hadoop.ozone.lease.Lease;
+import org.apache.hadoop.ozone.lease.LeaseException;
+import org.apache.hadoop.ozone.lease.LeaseManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_CHANGE_PIPELINE_STATE;
+
 /**
  * Sends the request to the right pipeline manager.
  */
@@ -57,6 +69,10 @@ public class PipelineSelector {
   private final StandaloneManagerImpl standaloneManager;
   private final long containerSize;
   private final Node2PipelineMap node2PipelineMap;
+  private final LeaseManager<Pipeline> pipelineLeaseManager;
+  private final StateMachine<LifeCycleState,
+      HddsProtos.LifeCycleEvent> stateMachine;
+
   /**
    * Constructs a pipeline Selector.
    *
@@ -77,6 +93,74 @@ public class PipelineSelector {
     this.ratisManager =
         new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
             conf, node2PipelineMap);
+    // Initialize the container state machine.
+    Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
+    long pipelineCreationLeaseTimeout = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
+        ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    LOG.trace("Starting Pipeline Lease Manager.");
+    pipelineLeaseManager = new LeaseManager<>(pipelineCreationLeaseTimeout);
+    pipelineLeaseManager.start();
+
+    // These are the steady states of a container.
+    finalStates.add(HddsProtos.LifeCycleState.OPEN);
+    finalStates.add(HddsProtos.LifeCycleState.CLOSED);
+
+    this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
+        finalStates);
+    initializeStateMachine();
+  }
+
+  /**
+   * Event and State Transition Mapping:
+   *
+   * State: ALLOCATED ---------------> CREATING
+   * Event:                CREATE
+   *
+   * State: CREATING  ---------------> OPEN
+   * Event:               CREATED
+   *
+   * State: OPEN      ---------------> CLOSING
+   * Event:               FINALIZE
+   *
+   * State: CLOSING   ---------------> CLOSED
+   * Event:                CLOSE
+   *
+   * State: CREATING  ---------------> CLOSED
+   * Event:               TIMEOUT
+   *
+   *
+   * Container State Flow:
+   *
+   * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
+   *            (CREATE)     | (CREATED)     (FINALIZE)   |
+   *                         |                            |
+   *                         |                            |
+   *                         |(TIMEOUT)                   |(CLOSE)
+   *                         |                            |
+   *                         +--------> [CLOSED] <--------+
+   */
+  private void initializeStateMachine() {
+    stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
+        HddsProtos.LifeCycleState.CREATING,
+        HddsProtos.LifeCycleEvent.CREATE);
+
+    stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
+        HddsProtos.LifeCycleState.OPEN,
+        HddsProtos.LifeCycleEvent.CREATED);
+
+    stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
+        HddsProtos.LifeCycleState.CLOSING,
+        HddsProtos.LifeCycleEvent.FINALIZE);
+
+    stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
+        HddsProtos.LifeCycleState.CLOSED,
+        HddsProtos.LifeCycleEvent.CLOSE);
+
+    stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
+        HddsProtos.LifeCycleState.CLOSED,
+        HddsProtos.LifeCycleEvent.TIMEOUT);
   }
 
   /**
@@ -88,15 +172,14 @@ public class PipelineSelector {
    * @return pipeline corresponding to nodes
    */
   public static Pipeline newPipelineFromNodes(
-      List<DatanodeDetails> nodes, LifeCycleState state,
-      ReplicationType replicationType, ReplicationFactor replicationFactor,
-      String name) {
+      List<DatanodeDetails> nodes, ReplicationType replicationType,
+      ReplicationFactor replicationFactor, String name) {
     Preconditions.checkNotNull(nodes);
     Preconditions.checkArgument(nodes.size() > 0);
     String leaderId = nodes.get(0).getUuidString();
-    Pipeline
-        pipeline = new Pipeline(leaderId, state, replicationType,
-        replicationFactor, name);
+    // A new pipeline always starts in allocated state
+    Pipeline pipeline = new Pipeline(leaderId, LifeCycleState.ALLOCATED,
+        replicationType, replicationFactor, name);
     for (DatanodeDetails node : nodes) {
       pipeline.addMember(node);
     }
@@ -175,8 +258,35 @@ public class PipelineSelector {
     LOG.debug("Getting replication pipeline forReplicationType {} :" +
             " ReplicationFactor {}", replicationType.toString(),
         replicationFactor.toString());
-    return manager.
-        getPipeline(replicationFactor, replicationType);
+
+    /**
+     * In the Ozone world, we have a very simple policy.
+     *
+     * 1. Try to create a pipeline if there are enough free nodes.
+     *
+     * 2. This allows all nodes to part of a pipeline quickly.
+     *
+     * 3. if there are not enough free nodes, return already allocated pipeline
+     * in a round-robin fashion.
+     *
+     * TODO: Might have to come up with a better algorithm than this.
+     * Create a new placement policy that returns pipelines in round robin
+     * fashion.
+     */
+    Pipeline pipeline =
+        manager.createPipeline(replicationFactor, replicationType);
+    if (pipeline == null) {
+      // try to return a pipeline from already allocated pipelines
+      pipeline = manager.getPipeline(replicationFactor, replicationType);
+    } else {
+      // if a new pipeline is created, initialize its state machine
+      updatePipelineState(pipeline,HddsProtos.LifeCycleEvent.CREATE);
+
+      //TODO: move the initialization of pipeline to Ozone Client
+      manager.initializePipeline(pipeline);
+      updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATED);
+    }
+    return pipeline;
   }
 
   /**
@@ -194,19 +304,6 @@ public class PipelineSelector {
         " pipelineName:{}", replicationType, pipelineName);
     return manager.getPipeline(pipelineName);
   }
-  /**
-   * Creates a pipeline from a specified set of Nodes.
-   */
-
-  public void createPipeline(ReplicationType replicationType, String
-      pipelineID, List<DatanodeDetails> datanodes) throws IOException {
-    PipelineManager manager = getPipelineManager(replicationType);
-    Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
-    LOG.debug("Creating a pipeline: {} with nodes:{}", pipelineID,
-        datanodes.stream().map(DatanodeDetails::toString)
-            .collect(Collectors.joining(",")));
-    manager.createPipeline(pipelineID, datanodes);
-  }
 
   /**
    * Close the  pipeline with the given clusterId.
@@ -251,12 +348,77 @@ public class PipelineSelector {
   }
 
   public void removePipeline(UUID dnId) {
-    Set<Pipeline> pipelineChannelSet =
+    Set<Pipeline> pipelineSet =
         node2PipelineMap.getPipelines(dnId);
-    for (Pipeline pipelineChannel : pipelineChannelSet) {
-      getPipelineManager(pipelineChannel.getType())
-          .removePipeline(pipelineChannel);
+    for (Pipeline pipeline : pipelineSet) {
+      getPipelineManager(pipeline.getType())
+          .removePipeline(pipeline);
     }
     node2PipelineMap.removeDatanode(dnId);
   }
+
+  /**
+   * Update the Pipeline State to the next state.
+   *
+   * @param pipeline - Pipeline
+   * @param event - LifeCycle Event
+   * @throws SCMException  on Failure.
+   */
+  public void updatePipelineState(Pipeline pipeline,
+      HddsProtos.LifeCycleEvent event) throws IOException {
+    HddsProtos.LifeCycleState newState;
+    try {
+      newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
+    } catch (InvalidStateTransitionException ex) {
+      String error = String.format("Failed to update pipeline state %s, " +
+              "reason: invalid state transition from state: %s upon " +
+              "event: %s.",
+          pipeline.getPipelineName(), pipeline.getLifeCycleState(), event);
+      LOG.error(error);
+      throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
+    }
+
+    // This is a post condition after executing getNextState.
+    Preconditions.checkNotNull(newState);
+    Preconditions.checkNotNull(pipeline);
+    try {
+      switch (event) {
+      case CREATE:
+        // Acquire lease on pipeline
+        Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline);
+        // Register callback to be executed in case of timeout
+        pipelineLease.registerCallBack(() -> {
+          updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.TIMEOUT);
+          return null;
+        });
+        break;
+      case CREATED:
+        // Release the lease on pipeline
+        pipelineLeaseManager.release(pipeline);
+        break;
+
+      case FINALIZE:
+        //TODO: cleanup pipeline by closing all the containers on the pipeline
+        break;
+
+      case CLOSE:
+      case TIMEOUT:
+        // TODO: Release the nodes here when pipelines are destroyed
+        break;
+      default:
+        throw new SCMException("Unsupported pipeline LifeCycleEvent.",
+            FAILED_TO_CHANGE_PIPELINE_STATE);
+      }
+
+      pipeline.setLifeCycleState(newState);
+    } catch (LeaseException e) {
+      throw new IOException("Lease Exception.", e);
+    }
+  }
+
+  public void shutdown() {
+    if (pipelineLeaseManager != null) {
+      pipelineLeaseManager.shutdown();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index a8f8b20..c726ef6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -72,7 +71,7 @@ public class RatisManagerImpl extends PipelineManager {
    * Allocates a new ratis Pipeline from the free nodes.
    *
    * @param factor - One or Three
-   * @return PipelineChannel.
+   * @return Pipeline.
    */
   public Pipeline allocatePipeline(ReplicationFactor factor) {
     List<DatanodeDetails> newNodesList = new LinkedList<>();
@@ -89,35 +88,23 @@ public class RatisManagerImpl extends PipelineManager {
           // further allocations
           ratisMembers.addAll(newNodesList);
           LOG.info("Allocating a new ratis pipeline of size: {}", count);
-          // Start all channel names with "Ratis", easy to grep the logs.
+          // Start all pipeline names with "Ratis", easy to grep the logs.
           String pipelineName = PREFIX +
               UUID.randomUUID().toString().substring(PREFIX.length());
-          Pipeline pipeline=
-              PipelineSelector.newPipelineFromNodes(newNodesList,
-              LifeCycleState.OPEN, ReplicationType.RATIS, factor, pipelineName);
-          try (XceiverClientRatis client =
-              XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
-            client.createPipeline(pipeline.getPipelineName(), newNodesList);
-          } catch (IOException e) {
-            return null;
-          }
-          return pipeline;
+          return PipelineSelector.newPipelineFromNodes(newNodesList,
+              ReplicationType.RATIS, factor, pipelineName);
         }
       }
     }
     return null;
   }
 
-  /**
-   * Creates a pipeline from a specified set of Nodes.
-   *
-   * @param pipelineID - Name of the pipeline
-   * @param datanodes - The list of datanodes that make this pipeline.
-   */
-  @Override
-  public void createPipeline(String pipelineID,
-                             List<DatanodeDetails> datanodes) {
-
+  public void initializePipeline(Pipeline pipeline) throws IOException {
+    //TODO:move the initialization from SCM to client
+    try (XceiverClientRatis client =
+        XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
+      client.createPipeline(pipeline.getPipelineName(), pipeline.getMachines());
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index cf691bf..bb4951f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -25,7 +25,6 @@ import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -86,29 +85,19 @@ public class StandaloneManagerImpl extends PipelineManager {
           // once a datanode has been added to a pipeline, exclude it from
           // further allocations
           standAloneMembers.addAll(newNodesList);
-          LOG.info("Allocating a new standalone pipeline channel of size: {}",
-              count);
-          String channelName =
+          LOG.info("Allocating a new standalone pipeline of size: {}", count);
+          String pipelineName =
               "SA-" + UUID.randomUUID().toString().substring(3);
           return PipelineSelector.newPipelineFromNodes(newNodesList,
-              LifeCycleState.OPEN, ReplicationType.STAND_ALONE,
-              ReplicationFactor.ONE, channelName);
+              ReplicationType.STAND_ALONE, ReplicationFactor.ONE, pipelineName);
         }
       }
     }
     return null;
   }
 
-  /**
-   * Creates a pipeline from a specified set of Nodes.
-   *
-   * @param pipelineID - Name of the pipeline
-   * @param datanodes - The list of datanodes that make this pipeline.
-   */
-  @Override
-  public void createPipeline(String pipelineID,
-                             List<DatanodeDetails> datanodes) {
-    //return newPipelineFromNodes(datanodes, pipelineID);
+  public void initializePipeline(Pipeline pipeline) {
+    // Nothing to be done for standalone pipeline
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9be25e34/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index bc3505f..ffac6d5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -26,6 +26,8 @@ import org.apache.hadoop.hdds.scm.container.common.helpers
     .ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.junit.AfterClass;
@@ -51,6 +53,7 @@ public class TestNode2PipelineMap {
   private static ContainerWithPipeline ratisContainer;
   private static ContainerStateMap stateMap;
   private static ContainerMapping mapping;
+  private static PipelineSelector pipelineSelector;
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -66,6 +69,7 @@ public class TestNode2PipelineMap {
     mapping = (ContainerMapping)scm.getScmContainerManager();
     stateMap = mapping.getStateManager().getContainerStateMap();
     ratisContainer = mapping.allocateContainer(RATIS, THREE, "testOwner");
+    pipelineSelector = mapping.getPipelineSelector();
   }
 
   /**
@@ -113,5 +117,15 @@ public class TestNode2PipelineMap {
     NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline(
         ratisContainer.getPipeline().getPipelineName());
     Assert.assertEquals(0, set2.size());
+
+    try {
+      pipelineSelector.updatePipelineState(ratisContainer.getPipeline(),
+          HddsProtos.LifeCycleEvent.CLOSE);
+      Assert.fail("closing of pipeline without finalize should fail");
+    } catch (Exception e) {
+      Assert.assertTrue(e instanceof SCMException);
+      Assert.assertEquals(((SCMException)e).getResult(),
+          SCMException.ResultCodes.FAILED_TO_CHANGE_PIPELINE_STATE);
+    }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org