You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2018/09/19 16:14:07 UTC

hadoop git commit: HDDS-461. Container remains in CLOSING state in SCM forever. Contributed by Shashikant Banerjee.

Repository: hadoop
Updated Branches:
  refs/heads/trunk b3c5221f3 -> 61a4b07bd


HDDS-461. Container remains in CLOSING state in SCM forever. Contributed by Shashikant Banerjee.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/61a4b07b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/61a4b07b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/61a4b07b

Branch: refs/heads/trunk
Commit: 61a4b07bdaf68d45c7c0bba0485d95a1d908a73d
Parents: b3c5221
Author: Nanda kumar <na...@apache.org>
Authored: Wed Sep 19 21:43:44 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Wed Sep 19 21:43:44 2018 +0530

----------------------------------------------------------------------
 .../common/statemachine/StateContext.java       |  42 +++++--
 .../CloseContainerCommandHandler.java           |  17 ++-
 .../transport/server/XceiverServerGrpc.java     |  12 +-
 .../server/ratis/XceiverServerRatis.java        |  31 +++--
 .../container/ozoneimpl/OzoneContainer.java     | 116 ++++++-------------
 .../commands/CloseContainerCommand.java         |  23 +---
 .../container/CloseContainerEventHandler.java   |  90 ++++++++------
 .../TestCloseContainerByPipeline.java           |   5 +-
 8 files changed, 175 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 9d5a778..47c2492 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine;
 
+import com.google.common.base.Preconditions;
 import com.google.protobuf.GeneratedMessage;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
+import java.util.ArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
@@ -50,7 +52,6 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
 
@@ -206,9 +207,18 @@ public class StateContext {
    * @return List<reports>
    */
   public List<GeneratedMessage> getReports(int maxLimit) {
+    List<GeneratedMessage> reportList = new ArrayList<>();
     synchronized (reports) {
-      return reports.parallelStream().limit(maxLimit)
-          .collect(Collectors.toList());
+      if (!reports.isEmpty()) {
+        int size = reports.size();
+        int limit = size > maxLimit ? maxLimit : size;
+        for (int count = 0; count < limit; count++) {
+          GeneratedMessage report = reports.poll();
+          Preconditions.checkNotNull(report);
+          reportList.add(report);
+        }
+      }
+      return reportList;
     }
   }
 
@@ -254,9 +264,20 @@ public class StateContext {
    * @return List<ContainerAction>
    */
   public List<ContainerAction> getPendingContainerAction(int maxLimit) {
+    List<ContainerAction> containerActionList = new ArrayList<>();
     synchronized (containerActions) {
-      return containerActions.parallelStream().limit(maxLimit)
-          .collect(Collectors.toList());
+      if (!containerActions.isEmpty()) {
+        int size = containerActions.size();
+        int limit = size > maxLimit ? maxLimit : size;
+        for (int count = 0; count < limit; count++) {
+          // we need to remove the action from the containerAction queue
+          // as well
+          ContainerAction action = containerActions.poll();
+          Preconditions.checkNotNull(action);
+          containerActionList.add(action);
+        }
+      }
+      return containerActionList;
     }
   }
 
@@ -295,9 +316,16 @@ public class StateContext {
    * @return List<ContainerAction>
    */
   public List<PipelineAction> getPendingPipelineAction(int maxLimit) {
+    List<PipelineAction> pipelineActionList = new ArrayList<>();
     synchronized (pipelineActions) {
-      return pipelineActions.parallelStream().limit(maxLimit)
-          .collect(Collectors.toList());
+      if (!pipelineActions.isEmpty()) {
+        int size = pipelineActions.size();
+        int limit = size > maxLimit ? maxLimit : size;
+        for (int count = 0; count < limit; count++) {
+          pipelineActionList.add(pipelineActions.poll());
+        }
+      }
+      return pipelineActionList;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index 030a357..d4e13ee 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -89,9 +90,21 @@ public class CloseContainerCommandHandler implements CommandHandler {
       // submit the close container request for the XceiverServer to handle
       container.submitContainerRequest(
           request.build(), replicationType, pipelineID);
-      cmdExecuted = true;
     } catch (Exception e) {
-      LOG.error("Can't close container " + containerID, e);
+      if (e instanceof NotLeaderException) {
+        // If the particular datanode is not the Ratis leader, the close
+        // container command will not be executed by the follower but will be
+        // executed by Ratis stateMachine transactions via leader to follower.
+        // There can also be case where the datanode is in candidate state.
+        // In these situations, NotLeaderException is thrown. Remove the status
+        // from cmdStatus Map here so that it will be retried only by SCM if the
+        // leader could not not close the container after a certain time.
+        context.removeCommandStatus(containerID);
+        LOG.info(e.getLocalizedMessage());
+      } else {
+        LOG.error("Can't close container " + containerID, e);
+        cmdExecuted = false;
+      }
     } finally {
       updateCommandStatus(context, command, cmdExecuted, LOG);
       long endTime = Time.monotonicNow();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 83e742c..c51da98 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -21,12 +21,15 @@ package org.apache.hadoop.ozone.container.common.transport.server;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
     .ContainerCommandRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReport;
 import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.
+    StorageContainerException;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
 
@@ -128,8 +131,13 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
 
   @Override
   public void submitRequest(ContainerCommandRequestProto request,
-      HddsProtos.PipelineID pipelineID) {
-    storageContainer.dispatch(request);
+      HddsProtos.PipelineID pipelineID) throws IOException {
+    ContainerProtos.ContainerCommandResponseProto response =
+        storageContainer.dispatch(request);
+    if (response.getResult() != ContainerProtos.Result.SUCCESS) {
+      throw new StorageContainerException(response.getMessage(),
+          response.getResult());
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index d88995b..c2ef504 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -74,7 +74,6 @@ import java.util.List;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -319,39 +318,35 @@ public final class XceiverServerRatis implements XceiverServerSpi {
     return server;
   }
 
-  private void processReply(RaftClientReply reply) {
-
+  private void processReply(RaftClientReply reply) throws IOException {
     // NotLeader exception is thrown only when the raft server to which the
     // request is submitted is not the leader. The request will be rejected
-    // and will eventually be executed once the request comnes via the leader
+    // and will eventually be executed once the request comes via the leader
     // node.
     NotLeaderException notLeaderException = reply.getNotLeaderException();
     if (notLeaderException != null) {
-      LOG.info(reply.getNotLeaderException().getLocalizedMessage());
+      throw notLeaderException;
     }
     StateMachineException stateMachineException =
         reply.getStateMachineException();
     if (stateMachineException != null) {
-      // In case the request could not be completed, StateMachine Exception
-      // will be thrown. For now, Just log the message.
-      // If the container could not be closed, SCM will come to know
-      // via containerReports. CloseContainer should be re tried via SCM.
-      LOG.error(stateMachineException.getLocalizedMessage());
+      throw stateMachineException;
     }
   }
 
   @Override
-  public void submitRequest(
-      ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID)
-      throws IOException {
-    // ReplicationLevel.MAJORITY ensures the transactions corresponding to
-    // the request here are applied on all the raft servers.
+  public void submitRequest(ContainerCommandRequestProto request,
+      HddsProtos.PipelineID pipelineID) throws IOException {
+    RaftClientReply reply;
     RaftClientRequest raftClientRequest =
         createRaftClientRequest(request, pipelineID,
             RaftClientRequest.writeRequestType(replicationLevel));
-    CompletableFuture<RaftClientReply> reply =
-        server.submitClientRequestAsync(raftClientRequest);
-    reply.thenAccept(this::processReply);
+    try {
+      reply = server.submitClientRequestAsync(raftClientRequest).get();
+    } catch (Exception e) {
+      throw new IOException(e.getMessage(), e);
+    }
+    processReply(reply);
   }
 
   private RaftClientRequest createRaftClientRequest(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index ebacf75..da58772 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -23,7 +23,8 @@ import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
@@ -46,12 +47,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.*;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.Map;
 
 import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
 
 /**
- * Ozone main class sets up the network server and initializes the container
+ * Ozone main class sets up the network servers and initializes the container
  * layer.
  */
 public class OzoneContainer {
@@ -64,7 +67,7 @@ public class OzoneContainer {
   private final OzoneConfiguration config;
   private final VolumeSet volumeSet;
   private final ContainerSet containerSet;
-  private final XceiverServerSpi[] server;
+  private final Map<ReplicationType, XceiverServerSpi> servers;
 
   /**
    * Construct OzoneContainer object.
@@ -82,14 +85,13 @@ public class OzoneContainer {
     buildContainerSet();
     hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet,
         context);
-    server = new XceiverServerSpi[]{
-        new XceiverServerGrpc(datanodeDetails, this.config, this
-            .hddsDispatcher, createReplicationService()),
-        XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this
-            .config, hddsDispatcher, context)
-    };
-
-
+    servers = new HashMap<>();
+    servers.put(ReplicationType.STAND_ALONE,
+        new XceiverServerGrpc(datanodeDetails, config, hddsDispatcher,
+            createReplicationService()));
+    servers.put(ReplicationType.RATIS, XceiverServerRatis
+        .newXceiverServerRatis(datanodeDetails, config, hddsDispatcher,
+            context));
   }
 
   private GrpcReplicationService createReplicationService() {
@@ -133,7 +135,7 @@ public class OzoneContainer {
    */
   public void start() throws IOException {
     LOG.info("Attempting to start container services.");
-    for (XceiverServerSpi serverinstance : server) {
+    for (XceiverServerSpi serverinstance : servers.values()) {
       serverinstance.start();
     }
     hddsDispatcher.init();
@@ -145,7 +147,7 @@ public class OzoneContainer {
   public void stop() {
     //TODO: at end of container IO integration work.
     LOG.info("Attempting to stop container services.");
-    for(XceiverServerSpi serverinstance: server) {
+    for(XceiverServerSpi serverinstance: servers.values()) {
       serverinstance.stop();
     }
     hddsDispatcher.shutdown();
@@ -169,7 +171,7 @@ public class OzoneContainer {
   public PipelineReportsProto getPipelineReport() {
     PipelineReportsProto.Builder pipelineReportsProto =
             PipelineReportsProto.newBuilder();
-    for (XceiverServerSpi serverInstance : server) {
+    for (XceiverServerSpi serverInstance : servers.values()) {
       pipelineReportsProto
               .addAllPipelineReport(serverInstance.getPipelineReport());
     }
@@ -181,82 +183,38 @@ public class OzoneContainer {
    * @param request
    * @param replicationType
    * @param pipelineID
-   * @throws IOException
    */
   public void submitContainerRequest(
       ContainerProtos.ContainerCommandRequestProto request,
-      HddsProtos.ReplicationType replicationType,
-      HddsProtos.PipelineID pipelineID) throws IOException {
-    XceiverServerSpi serverInstance;
-    long containerId = getContainerIdForCmd(request);
-    if (replicationType == HddsProtos.ReplicationType.RATIS) {
-      serverInstance = getRatisSerer();
-      Preconditions.checkNotNull(serverInstance);
-      serverInstance.submitRequest(request, pipelineID);
-      LOG.info("submitting {} request over RATIS server for container {}",
-          request.getCmdType(), containerId);
-    } else {
-      serverInstance = getStandaAloneSerer();
-      Preconditions.checkNotNull(serverInstance);
-      getStandaAloneSerer().submitRequest(request, pipelineID);
-      LOG.info(
-          "submitting {} request over STAND_ALONE server for container {}",
-          request.getCmdType(), containerId);
+      ReplicationType replicationType,
+      PipelineID pipelineID) throws IOException {
+    if (containerSet.getContainer(request.getContainerID())
+        .getContainerData().isClosed()) {
+      LOG.debug("Container {} is already closed", request.getContainerID());
+      // It might happen that the where the first attempt of closing the
+      // container failed with NOT_LEADER_EXCEPTION. In such cases, SCM will
+      // retry to check the container got really closed via Ratis.
+      // In such cases of the retry attempt, if the container is already closed
+      // via Ratis, we should just return.
     }
-
+    LOG.info("submitting {} request over {} server for container {}",
+        request.getCmdType(), replicationType, request.getContainerID());
+    Preconditions.checkState(servers.containsKey(replicationType));
+    servers.get(replicationType).submitRequest(request, pipelineID);
   }
 
-  private long getContainerIdForCmd(
-      ContainerProtos.ContainerCommandRequestProto request)
-      throws IllegalArgumentException {
-    ContainerProtos.Type type = request.getCmdType();
-    switch (type) {
-    case CloseContainer:
-      return request.getContainerID();
-      // Right now, we handle only closeContainer via queuing it over the
-      // over the XceiVerServer. For all other commands we throw Illegal
-      // argument exception here. Will need to extend the switch cases
-      // in case we want add another commands here.
-    default:
-      throw new IllegalArgumentException("Cmd " + request.getCmdType()
-          + " not supported over HearBeat Response");
-    }
-  }
-
-  private XceiverServerSpi getRatisSerer() {
-    for (XceiverServerSpi serverInstance : server) {
-      if (serverInstance instanceof XceiverServerRatis) {
-        return serverInstance;
-      }
-    }
-    return null;
-  }
-
-  private XceiverServerSpi getStandaAloneSerer() {
-    for (XceiverServerSpi serverInstance : server) {
-      if (!(serverInstance instanceof XceiverServerRatis)) {
-        return serverInstance;
-      }
-    }
-    return null;
-  }
-
-  private int getPortbyType(HddsProtos.ReplicationType replicationType) {
-    for (XceiverServerSpi serverinstance : server) {
-      if (serverinstance.getServerType() == replicationType) {
-        return serverinstance.getIPCPort();
-      }
-    }
-    return INVALID_PORT;
+  private int getPortByType(ReplicationType replicationType) {
+    return servers.containsKey(replicationType) ?
+        servers.get(replicationType).getIPCPort() : INVALID_PORT;
   }
 
   /**
-   * Returns the container server IPC port.
+   * Returns the container servers IPC port.
    *
-   * @return Container server IPC port.
+   * @return Container servers IPC port.
    */
   public int getContainerServerPort() {
-    return getPortbyType(HddsProtos.ReplicationType.STAND_ALONE);
+    return getPortByType(ReplicationType.STAND_ALONE);
   }
 
   /**
@@ -265,7 +223,7 @@ public class OzoneContainer {
    * @return Ratis port.
    */
   public int getRatisContainerServerPort() {
-    return getPortbyType(HddsProtos.ReplicationType.RATIS);
+    return getPortByType(ReplicationType.RATIS);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
index aaa5f11..c2c20a4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
@@ -30,25 +30,13 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
 public class CloseContainerCommand
     extends SCMCommand<CloseContainerCommandProto> {
 
-  private long containerID;
   private HddsProtos.ReplicationType replicationType;
   private PipelineID pipelineID;
 
   public CloseContainerCommand(long containerID,
       HddsProtos.ReplicationType replicationType,
       PipelineID pipelineID) {
-    super();
-    this.containerID = containerID;
-    this.replicationType = replicationType;
-    this.pipelineID = pipelineID;
-  }
-
-  // Should be called only for protobuf conversion
-  private CloseContainerCommand(long containerID,
-      HddsProtos.ReplicationType replicationType,
-      PipelineID pipelineID, long id) {
-    super(id);
-    this.containerID = containerID;
+    super(containerID);
     this.replicationType = replicationType;
     this.pipelineID = pipelineID;
   }
@@ -75,7 +63,7 @@ public class CloseContainerCommand
 
   public CloseContainerCommandProto getProto() {
     return CloseContainerCommandProto.newBuilder()
-        .setContainerID(containerID)
+        .setContainerID(getId())
         .setCmdId(getId())
         .setReplicationType(replicationType)
         .setPipelineID(pipelineID.getProtobuf())
@@ -85,13 +73,12 @@ public class CloseContainerCommand
   public static CloseContainerCommand getFromProtobuf(
       CloseContainerCommandProto closeContainerProto) {
     Preconditions.checkNotNull(closeContainerProto);
-    return new CloseContainerCommand(closeContainerProto.getContainerID(),
+    return new CloseContainerCommand(closeContainerProto.getCmdId(),
         closeContainerProto.getReplicationType(),
-        PipelineID.getFromProtobuf(closeContainerProto.getPipelineID()),
-        closeContainerProto.getCmdId());
+        PipelineID.getFromProtobuf(closeContainerProto.getPipelineID()));
   }
 
   public long getContainerID() {
-    return containerID;
+    return getId();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index b94ce4f..7baecc4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -17,10 +17,10 @@
 package org.apache.hadoop.hdds.scm.container;
 
 import java.io.IOException;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
@@ -57,7 +57,7 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
 
     LOG.info("Close container Event triggered for container : {}",
         containerID.getId());
-    ContainerWithPipeline containerWithPipeline = null;
+    ContainerWithPipeline containerWithPipeline;
     ContainerInfo info;
     try {
       containerWithPipeline =
@@ -74,42 +74,66 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
       return;
     }
 
-    if (info.getState() == HddsProtos.LifeCycleState.OPEN) {
-      for (DatanodeDetails datanode :
-          containerWithPipeline.getPipeline().getMachines()) {
-        CommandForDatanode closeContainerCommand = new CommandForDatanode<>(
-            datanode.getUuid(),
-            new CloseContainerCommand(containerID.getId(),
-                info.getReplicationType(), info.getPipelineID()));
-        publisher.fireEvent(DATANODE_COMMAND, closeContainerCommand);
-        publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ, new
-            CloseContainerRetryableReq(containerID));
-      }
-      try {
-        // Finalize event will make sure the state of the container transitions
-        // from OPEN to CLOSING in containerStateManager.
-        containerManager.updateContainerState(containerID.getId(),
-            HddsProtos.LifeCycleEvent.FINALIZE);
-      } catch (IOException ex) {
-        LOG.error("Failed to update the container state to FINALIZE for"
-            + "container : {}" + containerID, ex);
-      }
-    } else if (info.getState() == HddsProtos.LifeCycleState.ALLOCATED) {
-      try {
-        // Create event will make sure the state of the container transitions
-        // from OPEN to CREATING in containerStateManager, this will move
-        // the container out of active allocation path.
+    HddsProtos.LifeCycleState state = info.getState();
+    try {
+      switch (state) {
+      case ALLOCATED:
+        // We cannot close a container in ALLOCATED state, moving the
+        // container to CREATING state, this should eventually
+        // timeout and the container will be moved to DELETING state.
+        LOG.debug("Closing container {} in {} state", containerID, state);
         containerManager.updateContainerState(containerID.getId(),
             HddsProtos.LifeCycleEvent.CREATE);
-      } catch (IOException ex) {
-        LOG.error("Failed to update the container state to CREATE for"
-            + "container:{}" + containerID, ex);
+        break;
+      case CREATING:
+        // We cannot close a container in CREATING state, it will eventually
+        // timeout and moved to DELETING state.
+        LOG.debug("Closing container {} in {} state", containerID, state);
+        break;
+      case OPEN:
+        containerManager.updateContainerState(containerID.getId(),
+            HddsProtos.LifeCycleEvent.FINALIZE);
+        fireCloseContainerEvents(containerWithPipeline, info, publisher);
+        break;
+      case CLOSING:
+        fireCloseContainerEvents(containerWithPipeline, info, publisher);
+        break;
+      case CLOSED:
+      case DELETING:
+      case DELETED:
+        LOG.info(
+            "container with id : {} is in {} state and need not be closed.",
+            containerID.getId(), info.getState());
+        break;
+      default:
+        throw new IOException(
+            "Invalid container state for container " + containerID);
       }
-    } else {
-      LOG.info("container with id : {} is in {} state and need not be closed.",
-          containerID.getId(), info.getState());
+    } catch (IOException ex) {
+      LOG.error("Failed to update the container state for" + "container : {}"
+          + containerID, ex);
     }
+  }
+
+  private void fireCloseContainerEvents(
+      ContainerWithPipeline containerWithPipeline, ContainerInfo info,
+      EventPublisher publisher) {
+    ContainerID containerID = info.containerID();
+    // fire events.
+    CloseContainerCommand closeContainerCommand =
+        new CloseContainerCommand(containerID.getId(),
+            info.getReplicationType(), info.getPipelineID());
 
+    Pipeline pipeline = containerWithPipeline.getPipeline();
+    pipeline.getMachines().stream().map(
+        datanode -> new CommandForDatanode<>(datanode.getUuid(),
+            closeContainerCommand)).forEach((command) -> {
+              publisher.fireEvent(DATANODE_COMMAND, command);
+            });
+    publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ,
+        new CloseContainerRetryableReq(containerID));
+    LOG.trace("Issuing {} on Pipeline {} for container", closeContainerCommand,
+        pipeline, containerID);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index ed9c54d..8c52847 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -132,7 +132,6 @@ public class TestCloseContainerByPipeline {
     // Make sure the closeContainerCommandHandler is Invoked
     Assert.assertTrue(
         closeContainerHandler.getInvocationCount() > lastInvocationCount);
-
   }
 
   @Test
@@ -190,6 +189,7 @@ public class TestCloseContainerByPipeline {
     Assert.assertFalse((logCapturer.getOutput().contains(
         "submitting CloseContainer request over RATIS server for container "
             + containerID)));
+    logCapturer.stopCapturing();
   }
 
   @Test
@@ -239,13 +239,14 @@ public class TestCloseContainerByPipeline {
       Assert.assertTrue(isContainerClosed(cluster,
           containerID, datanodeDetails));
     }
+    // Make sure it was really closed via Ratis not STAND_ALONE server
     Assert.assertFalse(logCapturer.getOutput().contains(
         "submitting CloseContainer request over STAND_ALONE "
             + "server for container " + containerID));
-    // Make sure it was really closed via StandAlone not Ratis server
     Assert.assertTrue((logCapturer.getOutput().contains(
         "submitting CloseContainer request over RATIS server for container "
             + containerID)));
+    logCapturer.stopCapturing();
   }
 
   private Boolean isContainerClosed(MiniOzoneCluster cluster, long containerID,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org