You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by na...@apache.org on 2018/09/19 16:14:07 UTC
hadoop git commit: HDDS-461. Container remains in CLOSING state in
SCM forever. Contributed by Shashikant Banerjee.
Repository: hadoop
Updated Branches:
refs/heads/trunk b3c5221f3 -> 61a4b07bd
HDDS-461. Container remains in CLOSING state in SCM forever. Contributed by Shashikant Banerjee.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/61a4b07b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/61a4b07b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/61a4b07b
Branch: refs/heads/trunk
Commit: 61a4b07bdaf68d45c7c0bba0485d95a1d908a73d
Parents: b3c5221
Author: Nanda kumar <na...@apache.org>
Authored: Wed Sep 19 21:43:44 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Wed Sep 19 21:43:44 2018 +0530
----------------------------------------------------------------------
.../common/statemachine/StateContext.java | 42 +++++--
.../CloseContainerCommandHandler.java | 17 ++-
.../transport/server/XceiverServerGrpc.java | 12 +-
.../server/ratis/XceiverServerRatis.java | 31 +++--
.../container/ozoneimpl/OzoneContainer.java | 116 ++++++-------------
.../commands/CloseContainerCommand.java | 23 +---
.../container/CloseContainerEventHandler.java | 90 ++++++++------
.../TestCloseContainerByPipeline.java | 5 +-
8 files changed, 175 insertions(+), 161 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 9d5a778..47c2492 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -16,6 +16,7 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine;
+import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
@@ -50,7 +52,6 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
@@ -206,9 +207,18 @@ public class StateContext {
* @return List<reports>
*/
public List<GeneratedMessage> getReports(int maxLimit) {
+ List<GeneratedMessage> reportList = new ArrayList<>();
synchronized (reports) {
- return reports.parallelStream().limit(maxLimit)
- .collect(Collectors.toList());
+ if (!reports.isEmpty()) {
+ int size = reports.size();
+ int limit = size > maxLimit ? maxLimit : size;
+ for (int count = 0; count < limit; count++) {
+ GeneratedMessage report = reports.poll();
+ Preconditions.checkNotNull(report);
+ reportList.add(report);
+ }
+ }
+ return reportList;
}
}
@@ -254,9 +264,20 @@ public class StateContext {
* @return List<ContainerAction>
*/
public List<ContainerAction> getPendingContainerAction(int maxLimit) {
+ List<ContainerAction> containerActionList = new ArrayList<>();
synchronized (containerActions) {
- return containerActions.parallelStream().limit(maxLimit)
- .collect(Collectors.toList());
+ if (!containerActions.isEmpty()) {
+ int size = containerActions.size();
+ int limit = size > maxLimit ? maxLimit : size;
+ for (int count = 0; count < limit; count++) {
+ // we need to remove the action from the containerAction queue
+ // as well
+ ContainerAction action = containerActions.poll();
+ Preconditions.checkNotNull(action);
+ containerActionList.add(action);
+ }
+ }
+ return containerActionList;
}
}
@@ -295,9 +316,16 @@ public class StateContext {
* @return List<ContainerAction>
*/
public List<PipelineAction> getPendingPipelineAction(int maxLimit) {
+ List<PipelineAction> pipelineActionList = new ArrayList<>();
synchronized (pipelineActions) {
- return pipelineActions.parallelStream().limit(maxLimit)
- .collect(Collectors.toList());
+ if (!pipelineActions.isEmpty()) {
+ int size = pipelineActions.size();
+ int limit = size > maxLimit ? maxLimit : size;
+ for (int count = 0; count < limit; count++) {
+ pipelineActionList.add(pipelineActions.poll());
+ }
+ }
+ return pipelineActionList;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index 030a357..d4e13ee 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
+import org.apache.ratis.protocol.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -89,9 +90,21 @@ public class CloseContainerCommandHandler implements CommandHandler {
// submit the close container request for the XceiverServer to handle
container.submitContainerRequest(
request.build(), replicationType, pipelineID);
- cmdExecuted = true;
} catch (Exception e) {
- LOG.error("Can't close container " + containerID, e);
+ if (e instanceof NotLeaderException) {
+ // If the particular datanode is not the Ratis leader, the close
+ // container command will not be executed by the follower but will be
+ // executed by Ratis stateMachine transactions via leader to follower.
+ // There can also be case where the datanode is in candidate state.
+ // In these situations, NotLeaderException is thrown. Remove the status
+ // from cmdStatus Map here so that it will be retried only by SCM if the
+ // leader could not not close the container after a certain time.
+ context.removeCommandStatus(containerID);
+ LOG.info(e.getLocalizedMessage());
+ } else {
+ LOG.error("Can't close container " + containerID, e);
+ cmdExecuted = false;
+ }
} finally {
updateCommandStatus(context, command, cmdExecuted, LOG);
long endTime = Time.monotonicNow();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 83e742c..c51da98 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -21,12 +21,15 @@ package org.apache.hadoop.ozone.container.common.transport.server;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
.ContainerCommandRequestProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.
+ StorageContainerException;
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -128,8 +131,13 @@ public final class XceiverServerGrpc implements XceiverServerSpi {
@Override
public void submitRequest(ContainerCommandRequestProto request,
- HddsProtos.PipelineID pipelineID) {
- storageContainer.dispatch(request);
+ HddsProtos.PipelineID pipelineID) throws IOException {
+ ContainerProtos.ContainerCommandResponseProto response =
+ storageContainer.dispatch(request);
+ if (response.getResult() != ContainerProtos.Result.SUCCESS) {
+ throw new StorageContainerException(response.getMessage(),
+ response.getResult());
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index d88995b..c2ef504 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -74,7 +74,6 @@ import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@@ -319,39 +318,35 @@ public final class XceiverServerRatis implements XceiverServerSpi {
return server;
}
- private void processReply(RaftClientReply reply) {
-
+ private void processReply(RaftClientReply reply) throws IOException {
// NotLeader exception is thrown only when the raft server to which the
// request is submitted is not the leader. The request will be rejected
- // and will eventually be executed once the request comnes via the leader
+ // and will eventually be executed once the request comes via the leader
// node.
NotLeaderException notLeaderException = reply.getNotLeaderException();
if (notLeaderException != null) {
- LOG.info(reply.getNotLeaderException().getLocalizedMessage());
+ throw notLeaderException;
}
StateMachineException stateMachineException =
reply.getStateMachineException();
if (stateMachineException != null) {
- // In case the request could not be completed, StateMachine Exception
- // will be thrown. For now, Just log the message.
- // If the container could not be closed, SCM will come to know
- // via containerReports. CloseContainer should be re tried via SCM.
- LOG.error(stateMachineException.getLocalizedMessage());
+ throw stateMachineException;
}
}
@Override
- public void submitRequest(
- ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID)
- throws IOException {
- // ReplicationLevel.MAJORITY ensures the transactions corresponding to
- // the request here are applied on all the raft servers.
+ public void submitRequest(ContainerCommandRequestProto request,
+ HddsProtos.PipelineID pipelineID) throws IOException {
+ RaftClientReply reply;
RaftClientRequest raftClientRequest =
createRaftClientRequest(request, pipelineID,
RaftClientRequest.writeRequestType(replicationLevel));
- CompletableFuture<RaftClientReply> reply =
- server.submitClientRequestAsync(raftClientRequest);
- reply.thenAccept(this::processReply);
+ try {
+ reply = server.submitClientRequestAsync(raftClientRequest).get();
+ } catch (Exception e) {
+ throw new IOException(e.getMessage(), e);
+ }
+ processReply(reply);
}
private RaftClientRequest createRaftClientRequest(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index ebacf75..da58772 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -23,7 +23,8 @@ import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
@@ -46,12 +47,14 @@ import org.slf4j.LoggerFactory;
import java.io.*;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.Map;
import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT;
/**
- * Ozone main class sets up the network server and initializes the container
+ * Ozone main class sets up the network servers and initializes the container
* layer.
*/
public class OzoneContainer {
@@ -64,7 +67,7 @@ public class OzoneContainer {
private final OzoneConfiguration config;
private final VolumeSet volumeSet;
private final ContainerSet containerSet;
- private final XceiverServerSpi[] server;
+ private final Map<ReplicationType, XceiverServerSpi> servers;
/**
* Construct OzoneContainer object.
@@ -82,14 +85,13 @@ public class OzoneContainer {
buildContainerSet();
hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet,
context);
- server = new XceiverServerSpi[]{
- new XceiverServerGrpc(datanodeDetails, this.config, this
- .hddsDispatcher, createReplicationService()),
- XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this
- .config, hddsDispatcher, context)
- };
-
-
+ servers = new HashMap<>();
+ servers.put(ReplicationType.STAND_ALONE,
+ new XceiverServerGrpc(datanodeDetails, config, hddsDispatcher,
+ createReplicationService()));
+ servers.put(ReplicationType.RATIS, XceiverServerRatis
+ .newXceiverServerRatis(datanodeDetails, config, hddsDispatcher,
+ context));
}
private GrpcReplicationService createReplicationService() {
@@ -133,7 +135,7 @@ public class OzoneContainer {
*/
public void start() throws IOException {
LOG.info("Attempting to start container services.");
- for (XceiverServerSpi serverinstance : server) {
+ for (XceiverServerSpi serverinstance : servers.values()) {
serverinstance.start();
}
hddsDispatcher.init();
@@ -145,7 +147,7 @@ public class OzoneContainer {
public void stop() {
//TODO: at end of container IO integration work.
LOG.info("Attempting to stop container services.");
- for(XceiverServerSpi serverinstance: server) {
+ for(XceiverServerSpi serverinstance: servers.values()) {
serverinstance.stop();
}
hddsDispatcher.shutdown();
@@ -169,7 +171,7 @@ public class OzoneContainer {
public PipelineReportsProto getPipelineReport() {
PipelineReportsProto.Builder pipelineReportsProto =
PipelineReportsProto.newBuilder();
- for (XceiverServerSpi serverInstance : server) {
+ for (XceiverServerSpi serverInstance : servers.values()) {
pipelineReportsProto
.addAllPipelineReport(serverInstance.getPipelineReport());
}
@@ -181,82 +183,38 @@ public class OzoneContainer {
* @param request
* @param replicationType
* @param pipelineID
- * @throws IOException
*/
public void submitContainerRequest(
ContainerProtos.ContainerCommandRequestProto request,
- HddsProtos.ReplicationType replicationType,
- HddsProtos.PipelineID pipelineID) throws IOException {
- XceiverServerSpi serverInstance;
- long containerId = getContainerIdForCmd(request);
- if (replicationType == HddsProtos.ReplicationType.RATIS) {
- serverInstance = getRatisSerer();
- Preconditions.checkNotNull(serverInstance);
- serverInstance.submitRequest(request, pipelineID);
- LOG.info("submitting {} request over RATIS server for container {}",
- request.getCmdType(), containerId);
- } else {
- serverInstance = getStandaAloneSerer();
- Preconditions.checkNotNull(serverInstance);
- getStandaAloneSerer().submitRequest(request, pipelineID);
- LOG.info(
- "submitting {} request over STAND_ALONE server for container {}",
- request.getCmdType(), containerId);
+ ReplicationType replicationType,
+ PipelineID pipelineID) throws IOException {
+ if (containerSet.getContainer(request.getContainerID())
+ .getContainerData().isClosed()) {
+ LOG.debug("Container {} is already closed", request.getContainerID());
+ // It might happen that the where the first attempt of closing the
+ // container failed with NOT_LEADER_EXCEPTION. In such cases, SCM will
+ // retry to check the container got really closed via Ratis.
+ // In such cases of the retry attempt, if the container is already closed
+ // via Ratis, we should just return.
}
-
+ LOG.info("submitting {} request over {} server for container {}",
+ request.getCmdType(), replicationType, request.getContainerID());
+ Preconditions.checkState(servers.containsKey(replicationType));
+ servers.get(replicationType).submitRequest(request, pipelineID);
}
- private long getContainerIdForCmd(
- ContainerProtos.ContainerCommandRequestProto request)
- throws IllegalArgumentException {
- ContainerProtos.Type type = request.getCmdType();
- switch (type) {
- case CloseContainer:
- return request.getContainerID();
- // Right now, we handle only closeContainer via queuing it over the
- // over the XceiVerServer. For all other commands we throw Illegal
- // argument exception here. Will need to extend the switch cases
- // in case we want add another commands here.
- default:
- throw new IllegalArgumentException("Cmd " + request.getCmdType()
- + " not supported over HearBeat Response");
- }
- }
-
- private XceiverServerSpi getRatisSerer() {
- for (XceiverServerSpi serverInstance : server) {
- if (serverInstance instanceof XceiverServerRatis) {
- return serverInstance;
- }
- }
- return null;
- }
-
- private XceiverServerSpi getStandaAloneSerer() {
- for (XceiverServerSpi serverInstance : server) {
- if (!(serverInstance instanceof XceiverServerRatis)) {
- return serverInstance;
- }
- }
- return null;
- }
-
- private int getPortbyType(HddsProtos.ReplicationType replicationType) {
- for (XceiverServerSpi serverinstance : server) {
- if (serverinstance.getServerType() == replicationType) {
- return serverinstance.getIPCPort();
- }
- }
- return INVALID_PORT;
+ private int getPortByType(ReplicationType replicationType) {
+ return servers.containsKey(replicationType) ?
+ servers.get(replicationType).getIPCPort() : INVALID_PORT;
}
/**
- * Returns the container server IPC port.
+ * Returns the container servers IPC port.
*
- * @return Container server IPC port.
+ * @return Container servers IPC port.
*/
public int getContainerServerPort() {
- return getPortbyType(HddsProtos.ReplicationType.STAND_ALONE);
+ return getPortByType(ReplicationType.STAND_ALONE);
}
/**
@@ -265,7 +223,7 @@ public class OzoneContainer {
* @return Ratis port.
*/
public int getRatisContainerServerPort() {
- return getPortbyType(HddsProtos.ReplicationType.RATIS);
+ return getPortByType(ReplicationType.RATIS);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
index aaa5f11..c2c20a4 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
@@ -30,25 +30,13 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
public class CloseContainerCommand
extends SCMCommand<CloseContainerCommandProto> {
- private long containerID;
private HddsProtos.ReplicationType replicationType;
private PipelineID pipelineID;
public CloseContainerCommand(long containerID,
HddsProtos.ReplicationType replicationType,
PipelineID pipelineID) {
- super();
- this.containerID = containerID;
- this.replicationType = replicationType;
- this.pipelineID = pipelineID;
- }
-
- // Should be called only for protobuf conversion
- private CloseContainerCommand(long containerID,
- HddsProtos.ReplicationType replicationType,
- PipelineID pipelineID, long id) {
- super(id);
- this.containerID = containerID;
+ super(containerID);
this.replicationType = replicationType;
this.pipelineID = pipelineID;
}
@@ -75,7 +63,7 @@ public class CloseContainerCommand
public CloseContainerCommandProto getProto() {
return CloseContainerCommandProto.newBuilder()
- .setContainerID(containerID)
+ .setContainerID(getId())
.setCmdId(getId())
.setReplicationType(replicationType)
.setPipelineID(pipelineID.getProtobuf())
@@ -85,13 +73,12 @@ public class CloseContainerCommand
public static CloseContainerCommand getFromProtobuf(
CloseContainerCommandProto closeContainerProto) {
Preconditions.checkNotNull(closeContainerProto);
- return new CloseContainerCommand(closeContainerProto.getContainerID(),
+ return new CloseContainerCommand(closeContainerProto.getCmdId(),
closeContainerProto.getReplicationType(),
- PipelineID.getFromProtobuf(closeContainerProto.getPipelineID()),
- closeContainerProto.getCmdId());
+ PipelineID.getFromProtobuf(closeContainerProto.getPipelineID()));
}
public long getContainerID() {
- return containerID;
+ return getId();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index b94ce4f..7baecc4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -17,10 +17,10 @@
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
@@ -57,7 +57,7 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
LOG.info("Close container Event triggered for container : {}",
containerID.getId());
- ContainerWithPipeline containerWithPipeline = null;
+ ContainerWithPipeline containerWithPipeline;
ContainerInfo info;
try {
containerWithPipeline =
@@ -74,42 +74,66 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
return;
}
- if (info.getState() == HddsProtos.LifeCycleState.OPEN) {
- for (DatanodeDetails datanode :
- containerWithPipeline.getPipeline().getMachines()) {
- CommandForDatanode closeContainerCommand = new CommandForDatanode<>(
- datanode.getUuid(),
- new CloseContainerCommand(containerID.getId(),
- info.getReplicationType(), info.getPipelineID()));
- publisher.fireEvent(DATANODE_COMMAND, closeContainerCommand);
- publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ, new
- CloseContainerRetryableReq(containerID));
- }
- try {
- // Finalize event will make sure the state of the container transitions
- // from OPEN to CLOSING in containerStateManager.
- containerManager.updateContainerState(containerID.getId(),
- HddsProtos.LifeCycleEvent.FINALIZE);
- } catch (IOException ex) {
- LOG.error("Failed to update the container state to FINALIZE for"
- + "container : {}" + containerID, ex);
- }
- } else if (info.getState() == HddsProtos.LifeCycleState.ALLOCATED) {
- try {
- // Create event will make sure the state of the container transitions
- // from OPEN to CREATING in containerStateManager, this will move
- // the container out of active allocation path.
+ HddsProtos.LifeCycleState state = info.getState();
+ try {
+ switch (state) {
+ case ALLOCATED:
+ // We cannot close a container in ALLOCATED state, moving the
+ // container to CREATING state, this should eventually
+ // timeout and the container will be moved to DELETING state.
+ LOG.debug("Closing container {} in {} state", containerID, state);
containerManager.updateContainerState(containerID.getId(),
HddsProtos.LifeCycleEvent.CREATE);
- } catch (IOException ex) {
- LOG.error("Failed to update the container state to CREATE for"
- + "container:{}" + containerID, ex);
+ break;
+ case CREATING:
+ // We cannot close a container in CREATING state, it will eventually
+ // timeout and moved to DELETING state.
+ LOG.debug("Closing container {} in {} state", containerID, state);
+ break;
+ case OPEN:
+ containerManager.updateContainerState(containerID.getId(),
+ HddsProtos.LifeCycleEvent.FINALIZE);
+ fireCloseContainerEvents(containerWithPipeline, info, publisher);
+ break;
+ case CLOSING:
+ fireCloseContainerEvents(containerWithPipeline, info, publisher);
+ break;
+ case CLOSED:
+ case DELETING:
+ case DELETED:
+ LOG.info(
+ "container with id : {} is in {} state and need not be closed.",
+ containerID.getId(), info.getState());
+ break;
+ default:
+ throw new IOException(
+ "Invalid container state for container " + containerID);
}
- } else {
- LOG.info("container with id : {} is in {} state and need not be closed.",
- containerID.getId(), info.getState());
+ } catch (IOException ex) {
+ LOG.error("Failed to update the container state for" + "container : {}"
+ + containerID, ex);
}
+ }
+
+ private void fireCloseContainerEvents(
+ ContainerWithPipeline containerWithPipeline, ContainerInfo info,
+ EventPublisher publisher) {
+ ContainerID containerID = info.containerID();
+ // fire events.
+ CloseContainerCommand closeContainerCommand =
+ new CloseContainerCommand(containerID.getId(),
+ info.getReplicationType(), info.getPipelineID());
+ Pipeline pipeline = containerWithPipeline.getPipeline();
+ pipeline.getMachines().stream().map(
+ datanode -> new CommandForDatanode<>(datanode.getUuid(),
+ closeContainerCommand)).forEach((command) -> {
+ publisher.fireEvent(DATANODE_COMMAND, command);
+ });
+ publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ,
+ new CloseContainerRetryableReq(containerID));
+ LOG.trace("Issuing {} on Pipeline {} for container", closeContainerCommand,
+ pipeline, containerID);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/61a4b07b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index ed9c54d..8c52847 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -132,7 +132,6 @@ public class TestCloseContainerByPipeline {
// Make sure the closeContainerCommandHandler is Invoked
Assert.assertTrue(
closeContainerHandler.getInvocationCount() > lastInvocationCount);
-
}
@Test
@@ -190,6 +189,7 @@ public class TestCloseContainerByPipeline {
Assert.assertFalse((logCapturer.getOutput().contains(
"submitting CloseContainer request over RATIS server for container "
+ containerID)));
+ logCapturer.stopCapturing();
}
@Test
@@ -239,13 +239,14 @@ public class TestCloseContainerByPipeline {
Assert.assertTrue(isContainerClosed(cluster,
containerID, datanodeDetails));
}
+ // Make sure it was really closed via Ratis not STAND_ALONE server
Assert.assertFalse(logCapturer.getOutput().contains(
"submitting CloseContainer request over STAND_ALONE "
+ "server for container " + containerID));
- // Make sure it was really closed via StandAlone not Ratis server
Assert.assertTrue((logCapturer.getOutput().contains(
"submitting CloseContainer request over RATIS server for container "
+ containerID)));
+ logCapturer.stopCapturing();
}
private Boolean isContainerClosed(MiniOzoneCluster cluster, long containerID,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org