You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2018/09/18 02:06:12 UTC
[36/50] [abbrv] hadoop git commit: HDDS-399. Persist open pipeline
information across SCM restart. Contributed by Mukul Kumar Singh.
HDDS-399. Persist open pipeline information across SCM restart. Contributed by Mukul Kumar Singh.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/84693669
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/84693669
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/84693669
Branch: refs/heads/HDFS-12943
Commit: 846936698b2c8c50662e43534ac999df82066a8b
Parents: 9a265fa
Author: Nanda kumar <na...@apache.org>
Authored: Mon Sep 17 21:51:54 2018 +0530
Committer: Nanda kumar <na...@apache.org>
Committed: Mon Sep 17 21:51:54 2018 +0530
----------------------------------------------------------------------
.../scm/container/common/helpers/Pipeline.java | 24 ++
.../org/apache/hadoop/ozone/OzoneConsts.java | 2 +
.../hdds/scm/container/ContainerMapping.java | 24 +-
.../scm/container/ContainerStateManager.java | 25 +-
.../scm/container/states/ContainerStateMap.java | 38 ---
.../hdds/scm/pipelines/PipelineManager.java | 148 +++++------
.../hdds/scm/pipelines/PipelineSelector.java | 249 +++++++++----------
.../scm/pipelines/PipelineStateManager.java | 136 ++++++++++
.../scm/pipelines/ratis/RatisManagerImpl.java | 8 +-
.../standalone/StandaloneManagerImpl.java | 8 +-
.../container/TestContainerReportHandler.java | 3 +-
.../container/TestContainerStateManager.java | 4 +-
.../hdds/scm/node/TestDeadNodeHandler.java | 4 +-
.../TestContainerStateManagerIntegration.java | 10 +-
.../hdds/scm/pipeline/TestNode2PipelineMap.java | 22 +-
.../hdds/scm/pipeline/TestPipelineClose.java | 15 +-
.../hdds/scm/pipeline/TestSCMRestart.java | 101 ++++++++
.../apache/hadoop/ozone/MiniOzoneCluster.java | 5 +-
.../hadoop/ozone/MiniOzoneClusterImpl.java | 8 +-
19 files changed, 510 insertions(+), 324 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
index 6757262..ef148e5 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/Pipeline.java
@@ -86,6 +86,30 @@ public class Pipeline {
datanodes = new TreeMap<>();
}
+ @Override
+ public int hashCode() {
+ return id.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ Pipeline that = (Pipeline) o;
+
+ return id.equals(that.id)
+ && factor.equals(that.factor)
+ && type.equals(that.type)
+ && lifeCycleState.equals(that.lifeCycleState)
+ && leaderID.equals(that.leaderID);
+
+ }
+
/**
* Gets pipeline object from protobuf.
*
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index bf4508b..0a15ec8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -90,7 +90,9 @@ public final class OzoneConsts {
* level DB names used by SCM and data nodes.
*/
public static final String CONTAINER_DB_SUFFIX = "container.db";
+ public static final String PIPELINE_DB_SUFFIX = "pipeline.db";
public static final String SCM_CONTAINER_DB = "scm-" + CONTAINER_DB_SUFFIX;
+ public static final String SCM_PIPELINE_DB = "scm-" + PIPELINE_DB_SUFFIX;
public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX;
public static final String DELETED_BLOCK_DB = "deletedBlock.db";
public static final String OM_DB_NAME = "om.db";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 5678205..11cc9ee 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -130,12 +130,13 @@ public class ContainerMapping implements Mapping {
size = (long)conf.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
- this.containerStateManager =
- new ContainerStateManager(conf, this);
- LOG.trace("Container State Manager created.");
this.pipelineSelector = new PipelineSelector(nodeManager,
- containerStateManager, conf, eventPublisher);
+ conf, eventPublisher, cacheSizeMB);
+
+ this.containerStateManager =
+ new ContainerStateManager(conf, this, pipelineSelector);
+ LOG.trace("Container State Manager created.");
this.eventPublisher = eventPublisher;
@@ -202,11 +203,6 @@ public class ContainerMapping implements Mapping {
if (contInfo.isContainerOpen()) {
// If pipeline with given pipeline Id already exist return it
pipeline = pipelineSelector.getPipeline(contInfo.getPipelineID());
- if (pipeline == null) {
- pipeline = pipelineSelector
- .getReplicationPipeline(contInfo.getReplicationType(),
- contInfo.getReplicationFactor());
- }
} else {
// For close containers create pipeline from datanodes with replicas
Set<DatanodeDetails> dnWithReplicas = containerStateManager
@@ -392,9 +388,8 @@ public class ContainerMapping implements Mapping {
ContainerInfo updatedContainer = containerStateManager
.updateContainerState(containerInfo, event);
if (!updatedContainer.isContainerOpen()) {
- Pipeline pipeline = pipelineSelector
- .getPipeline(containerInfo.getPipelineID());
- pipelineSelector.closePipelineIfNoOpenContainers(pipeline);
+ pipelineSelector.removeContainerFromPipeline(
+ containerInfo.getPipelineID(), containerID);
}
containerStore.put(dbKey, updatedContainer.getProtobuf().toByteArray());
return updatedContainer.getState();
@@ -474,11 +469,6 @@ public class ContainerMapping implements Mapping {
}
Pipeline pipeline = pipelineSelector
.getPipeline(containerInfo.getPipelineID());
- if (pipeline == null) {
- pipeline = pipelineSelector
- .getReplicationPipeline(containerInfo.getReplicationType(),
- containerInfo.getReplicationFactor());
- }
return new ContainerWithPipeline(containerInfo, pipeline);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 7989c55..930c098 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.container.states.ContainerState;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
@@ -137,7 +136,7 @@ public class ContainerStateManager implements Closeable {
*/
@SuppressWarnings("unchecked")
public ContainerStateManager(Configuration configuration,
- Mapping containerMapping) {
+ Mapping containerMapping, PipelineSelector pipelineSelector) {
// Initialize the container state machine.
Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
@@ -159,10 +158,11 @@ public class ContainerStateManager implements Closeable {
lastUsedMap = new ConcurrentHashMap<>();
containerCount = new AtomicLong(0);
containers = new ContainerStateMap();
- loadExistingContainers(containerMapping);
+ loadExistingContainers(containerMapping, pipelineSelector);
}
- private void loadExistingContainers(Mapping containerMapping) {
+ private void loadExistingContainers(Mapping containerMapping,
+ PipelineSelector pipelineSelector) {
List<ContainerInfo> containerList;
try {
@@ -184,6 +184,8 @@ public class ContainerStateManager implements Closeable {
long maxID = 0;
for (ContainerInfo container : containerList) {
containers.addContainer(container);
+ pipelineSelector.addContainerToPipeline(
+ container.getPipelineID(), container.getContainerID());
if (maxID < container.getContainerID()) {
maxID = container.getContainerID();
@@ -303,6 +305,7 @@ public class ContainerStateManager implements Closeable {
+ "replication=%s couldn't be found for the new container. "
+ "Do you have enough nodes?", type, replicationFactor);
+ long containerID = containerCount.incrementAndGet();
ContainerInfo containerInfo = new ContainerInfo.Builder()
.setState(HddsProtos.LifeCycleState.ALLOCATED)
.setPipelineID(pipeline.getId())
@@ -313,11 +316,12 @@ public class ContainerStateManager implements Closeable {
.setNumberOfKeys(0)
.setStateEnterTime(Time.monotonicNow())
.setOwner(owner)
- .setContainerID(containerCount.incrementAndGet())
+ .setContainerID(containerID)
.setDeleteTransactionId(0)
.setReplicationFactor(replicationFactor)
.setReplicationType(pipeline.getType())
.build();
+ selector.addContainerToPipeline(pipeline.getId(), containerID);
Preconditions.checkNotNull(containerInfo);
containers.addContainer(containerInfo);
LOG.trace("New container allocated: {}", containerInfo);
@@ -471,17 +475,6 @@ public class ContainerStateManager implements Closeable {
}
/**
- * Returns a set of open ContainerIDs that reside on a pipeline.
- *
- * @param pipelineID PipelineID of the Containers.
- * @return Set of containers that match the specific query parameters.
- */
- public NavigableSet<ContainerID> getMatchingContainerIDsByPipeline(PipelineID
- pipelineID) {
- return containers.getOpenContainerIDsByPipeline(pipelineID);
- }
-
- /**
* Returns the containerInfo with pipeline for the given container id.
* @param selector -- Pipeline selector class.
* @param containerID id of the container
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
index 9657594..880a715 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/states/ContainerStateMap.java
@@ -25,7 +25,6 @@ import java.util.Set;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
@@ -75,9 +74,6 @@ import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
* Replica and THREE Replica. User can specify how many copies should be made
* for a ozone key.
* <p>
- * 5.Pipeline - The pipeline constitute the set of Datanodes on which the
- * open container resides physically.
- * <p>
* The most common access pattern of this class is to select a container based
* on all these parameters, for example, when allocating a block we will
* select a container that belongs to user1, with Ratis replication which can
@@ -92,14 +88,6 @@ public class ContainerStateMap {
private final ContainerAttribute<String> ownerMap;
private final ContainerAttribute<ReplicationFactor> factorMap;
private final ContainerAttribute<ReplicationType> typeMap;
- // This map constitutes the pipeline to open container mappings.
- // This map will be queried for the list of open containers on a particular
- // pipeline and issue a close on corresponding containers in case of
- // following events:
- //1. Dead datanode.
- //2. Datanode out of space.
- //3. Volume loss or volume out of space.
- private final ContainerAttribute<PipelineID> openPipelineMap;
private final Map<ContainerID, ContainerInfo> containerMap;
// Map to hold replicas of given container.
@@ -121,7 +109,6 @@ public class ContainerStateMap {
ownerMap = new ContainerAttribute<>();
factorMap = new ContainerAttribute<>();
typeMap = new ContainerAttribute<>();
- openPipelineMap = new ContainerAttribute<>();
containerMap = new HashMap<>();
lock = new ReentrantReadWriteLock();
contReplicaMap = new HashMap<>();
@@ -158,9 +145,6 @@ public class ContainerStateMap {
ownerMap.insert(info.getOwner(), id);
factorMap.insert(info.getReplicationFactor(), id);
typeMap.insert(info.getReplicationType(), id);
- if (info.isContainerOpen()) {
- openPipelineMap.insert(info.getPipelineID(), id);
- }
// Flush the cache of this container type, will be added later when
// get container queries are executed.
@@ -391,11 +375,6 @@ public class ContainerStateMap {
throw new SCMException("Updating the container map failed.", ex,
FAILED_TO_CHANGE_CONTAINER_STATE);
}
- // In case the container is set to closed state, it needs to be removed
- // from the pipeline Map.
- if (!info.isContainerOpen()) {
- openPipelineMap.remove(info.getPipelineID(), id);
- }
} finally {
lock.writeLock().unlock();
}
@@ -434,23 +413,6 @@ public class ContainerStateMap {
}
/**
- * Returns Open containers in the SCM by the Pipeline.
- *
- * @param pipelineID - Pipeline id.
- * @return NavigableSet<ContainerID>
- */
- public NavigableSet<ContainerID> getOpenContainerIDsByPipeline(
- PipelineID pipelineID) {
- Preconditions.checkNotNull(pipelineID);
- lock.readLock().lock();
- try {
- return openPipelineMap.getCollection(pipelineID);
- } finally {
- lock.readLock().unlock();
- }
- }
-
- /**
* Returns Containers by replication factor.
*
* @param factor - Replication Factor.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
index 102df8a..07ff2b0 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineManager.java
@@ -16,11 +16,10 @@
*/
package org.apache.hadoop.hdds.scm.pipelines;
+import java.util.ArrayList;
import java.util.LinkedList;
-import java.util.Map;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
@@ -37,17 +36,53 @@ import java.util.concurrent.atomic.AtomicInteger;
public abstract class PipelineManager {
private static final Logger LOG =
LoggerFactory.getLogger(PipelineManager.class);
- private final List<PipelineID> activePipelines;
- private final Map<PipelineID, Pipeline> pipelineMap;
- private final AtomicInteger pipelineIndex;
- private final Node2PipelineMap node2PipelineMap;
-
- public PipelineManager(Node2PipelineMap map,
- Map<PipelineID, Pipeline> pipelineMap) {
- activePipelines = new LinkedList<>();
- pipelineIndex = new AtomicInteger(0);
- this.pipelineMap = pipelineMap;
- this.node2PipelineMap = map;
+ private final ArrayList<ActivePipelines> activePipelines;
+
+ public PipelineManager() {
+ activePipelines = new ArrayList<>();
+ for (ReplicationFactor factor : ReplicationFactor.values()) {
+ activePipelines.add(factor.ordinal(), new ActivePipelines());
+ }
+ }
+
+ private static class ActivePipelines {
+ private final List<PipelineID> activePipelines;
+ private final AtomicInteger pipelineIndex;
+
+ ActivePipelines() {
+ activePipelines = new LinkedList<>();
+ pipelineIndex = new AtomicInteger(0);
+ }
+
+ void addPipeline(PipelineID pipelineID) {
+ activePipelines.add(pipelineID);
+ }
+
+ void removePipeline(PipelineID pipelineID) {
+ activePipelines.remove(pipelineID);
+ }
+
+ /**
+ * Find a Pipeline that is operational.
+ *
+ * @return - Pipeline or null
+ */
+ PipelineID findOpenPipeline() {
+ if (activePipelines.size() == 0) {
+ LOG.error("No Operational pipelines found. Returning null.");
+ return null;
+ }
+ return activePipelines.get(getNextIndex());
+ }
+
+ /**
+ * gets the next index of the Pipeline to get.
+ *
+ * @return index in the link list to get.
+ */
+ private int getNextIndex() {
+ return pipelineIndex.incrementAndGet() % activePipelines.size();
+ }
}
/**
@@ -59,44 +94,30 @@ public abstract class PipelineManager {
* @param replicationFactor - Replication Factor
* @return a Pipeline.
*/
- public synchronized final Pipeline getPipeline(
+ public synchronized final PipelineID getPipeline(
ReplicationFactor replicationFactor, ReplicationType replicationType) {
- Pipeline pipeline = findOpenPipeline(replicationType, replicationFactor);
- if (pipeline != null) {
+ PipelineID id =
+ activePipelines.get(replicationFactor.ordinal()).findOpenPipeline();
+ if (id != null) {
LOG.debug("re-used pipeline:{} for container with " +
"replicationType:{} replicationFactor:{}",
- pipeline.getId(), replicationType, replicationFactor);
+ id, replicationType, replicationFactor);
}
- if (pipeline == null) {
+ if (id == null) {
LOG.error("Get pipeline call failed. We are not able to find" +
" operational pipeline.");
return null;
} else {
- return pipeline;
+ return id;
}
}
- /**
- * This function to get pipeline with given pipeline name.
- *
- * @param id
- * @return a Pipeline.
- */
- public synchronized final Pipeline getPipeline(PipelineID id) {
- Pipeline pipeline = null;
-
- // 1. Check if pipeline already exists
- if (pipelineMap.containsKey(id)) {
- pipeline = pipelineMap.get(id);
- LOG.debug("Returning pipeline for pipelineName:{}", id);
- return pipeline;
- } else {
- LOG.debug("Unable to find pipeline for pipelineName:{}", id);
- }
- return pipeline;
+ void addOpenPipeline(Pipeline pipeline) {
+ activePipelines.get(pipeline.getFactor().ordinal())
+ .addPipeline(pipeline.getId());
}
- protected int getReplicationCount(ReplicationFactor factor) {
+ protected static int getReplicationCount(ReplicationFactor factor) {
switch (factor) {
case ONE:
return 1;
@@ -117,46 +138,6 @@ public abstract class PipelineManager {
public abstract void initializePipeline(Pipeline pipeline) throws IOException;
/**
- * Find a Pipeline that is operational.
- *
- * @return - Pipeline or null
- */
- private Pipeline findOpenPipeline(
- ReplicationType type, ReplicationFactor factor) {
- Pipeline pipeline = null;
- final int sentinal = -1;
- if (activePipelines.size() == 0) {
- LOG.error("No Operational pipelines found. Returning null.");
- return null;
- }
- int startIndex = getNextIndex();
- int nextIndex = sentinal;
- for (; startIndex != nextIndex; nextIndex = getNextIndex()) {
- // Just walk the list in a circular way.
- PipelineID id =
- activePipelines
- .get(nextIndex != sentinal ? nextIndex : startIndex);
- Pipeline temp = pipelineMap.get(id);
- // if we find an operational pipeline just return that.
- if ((temp.getLifeCycleState() == LifeCycleState.OPEN) &&
- (temp.getFactor() == factor) && (temp.getType() == type)) {
- pipeline = temp;
- break;
- }
- }
- return pipeline;
- }
-
- /**
- * gets the next index of the Pipeline to get.
- *
- * @return index in the link list to get.
- */
- private int getNextIndex() {
- return pipelineIndex.incrementAndGet() % activePipelines.size();
- }
-
- /**
* Creates a pipeline with a specified replication factor and type.
* @param replicationFactor - Replication Factor.
* @param replicationType - Replication Type.
@@ -168,9 +149,6 @@ public abstract class PipelineManager {
LOG.debug("created new pipeline:{} for container with "
+ "replicationType:{} replicationFactor:{}",
pipeline.getId(), replicationType, replicationFactor);
- activePipelines.add(pipeline.getId());
- pipelineMap.put(pipeline.getId(), pipeline);
- node2PipelineMap.addPipeline(pipeline);
}
return pipeline;
}
@@ -180,17 +158,15 @@ public abstract class PipelineManager {
* @param pipeline pipeline to be finalized
*/
public synchronized void finalizePipeline(Pipeline pipeline) {
- activePipelines.remove(pipeline.getId());
+ activePipelines.get(pipeline.getFactor().ordinal())
+ .removePipeline(pipeline.getId());
}
/**
*
* @param pipeline
*/
- public void closePipeline(Pipeline pipeline) throws IOException {
- pipelineMap.remove(pipeline.getId());
- node2PipelineMap.removePipeline(pipeline);
- }
+ public abstract void closePipeline(Pipeline pipeline) throws IOException;
/**
* list members in the pipeline.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
index 63afbaa..c9f51f7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineSelector.java
@@ -21,7 +21,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerStateManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
@@ -39,30 +38,34 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.ozone.common.statemachine
- .InvalidStateTransitionException;
-import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.lease.Lease;
import org.apache.hadoop.ozone.lease.LeaseException;
import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.io.IOException;
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.List;
-import java.util.NavigableSet;
+import java.util.HashMap;
import java.util.Set;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
.FAILED_TO_CHANGE_PIPELINE_STATE;
+import static org.apache.hadoop.hdds.server
+ .ServerUtils.getOzoneMetaDirPath;
+import static org.apache.hadoop.ozone
+ .OzoneConsts.SCM_PIPELINE_DB;
/**
* Sends the request to the right pipeline manager.
@@ -73,16 +76,16 @@ public class PipelineSelector {
private final ContainerPlacementPolicy placementPolicy;
private final NodeManager nodeManager;
private final Configuration conf;
- private final ContainerStateManager containerStateManager;
private final EventPublisher eventPublisher;
private final RatisManagerImpl ratisManager;
private final StandaloneManagerImpl standaloneManager;
private final long containerSize;
+ private final MetadataStore pipelineStore;
+ private final PipelineStateManager stateManager;
private final Node2PipelineMap node2PipelineMap;
+ private final Map<PipelineID, HashSet<ContainerID>> pipeline2ContainerMap;
private final Map<PipelineID, Pipeline> pipelineMap;
private final LeaseManager<Pipeline> pipelineLeaseManager;
- private final StateMachine<LifeCycleState,
- HddsProtos.LifeCycleEvent> stateMachine;
/**
* Constructs a pipeline Selector.
@@ -90,9 +93,8 @@ public class PipelineSelector {
* @param nodeManager - node manager
* @param conf - Ozone Config
*/
- public PipelineSelector(NodeManager nodeManager,
- ContainerStateManager containerStateManager, Configuration conf,
- EventPublisher eventPublisher) {
+ public PipelineSelector(NodeManager nodeManager, Configuration conf,
+ EventPublisher eventPublisher, int cacheSizeMB) throws IOException {
this.nodeManager = nodeManager;
this.conf = conf;
this.eventPublisher = eventPublisher;
@@ -105,79 +107,66 @@ public class PipelineSelector {
pipelineMap = new ConcurrentHashMap<>();
this.standaloneManager =
new StandaloneManagerImpl(this.nodeManager, placementPolicy,
- containerSize, node2PipelineMap, pipelineMap);
+ containerSize);
this.ratisManager =
new RatisManagerImpl(this.nodeManager, placementPolicy, containerSize,
- conf, node2PipelineMap, pipelineMap);
- // Initialize the container state machine.
- Set<HddsProtos.LifeCycleState> finalStates = new HashSet();
+ conf);
long pipelineCreationLeaseTimeout = conf.getTimeDuration(
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_LEASE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
- this.containerStateManager = containerStateManager;
pipelineLeaseManager = new LeaseManager<>("PipelineCreation",
pipelineCreationLeaseTimeout);
pipelineLeaseManager.start();
- // These are the steady states of a container.
- finalStates.add(HddsProtos.LifeCycleState.OPEN);
- finalStates.add(HddsProtos.LifeCycleState.CLOSED);
+ stateManager = new PipelineStateManager();
+ pipeline2ContainerMap = new HashMap<>();
- this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
- finalStates);
- initializeStateMachine();
+ // Write the container name to pipeline mapping.
+ File metaDir = getOzoneMetaDirPath(conf);
+ File containerDBPath = new File(metaDir, SCM_PIPELINE_DB);
+ pipelineStore = MetadataStoreBuilder.newBuilder()
+ .setConf(conf)
+ .setDbFile(containerDBPath)
+ .setCacheSize(cacheSizeMB * OzoneConsts.MB)
+ .build();
+
+ reloadExistingPipelines();
}
- /**
- * Event and State Transition Mapping.
- *
- * State: ALLOCATED ---------------> CREATING
- * Event: CREATE
- *
- * State: CREATING ---------------> OPEN
- * Event: CREATED
- *
- * State: OPEN ---------------> CLOSING
- * Event: FINALIZE
- *
- * State: CLOSING ---------------> CLOSED
- * Event: CLOSE
- *
- * State: CREATING ---------------> CLOSED
- * Event: TIMEOUT
- *
- *
- * Container State Flow:
- *
- * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
- * (CREATE) | (CREATED) (FINALIZE) |
- * | |
- * | |
- * |(TIMEOUT) |(CLOSE)
- * | |
- * +--------> [CLOSED] <--------+
- */
- private void initializeStateMachine() {
- stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
- HddsProtos.LifeCycleState.CREATING,
- HddsProtos.LifeCycleEvent.CREATE);
-
- stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
- HddsProtos.LifeCycleState.OPEN,
- HddsProtos.LifeCycleEvent.CREATED);
-
- stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
- HddsProtos.LifeCycleState.CLOSING,
- HddsProtos.LifeCycleEvent.FINALIZE);
-
- stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
- HddsProtos.LifeCycleState.CLOSED,
- HddsProtos.LifeCycleEvent.CLOSE);
-
- stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
- HddsProtos.LifeCycleState.CLOSED,
- HddsProtos.LifeCycleEvent.TIMEOUT);
+ private void reloadExistingPipelines() throws IOException {
+ if (pipelineStore.isEmpty()) {
+ // Nothing to do just return
+ return;
+ }
+
+ List<Map.Entry<byte[], byte[]>> range =
+ pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE, null);
+
+ // Transform the values into the pipelines.
+ // TODO: filter by pipeline state
+ for (Map.Entry<byte[], byte[]> entry : range) {
+ Pipeline pipeline = Pipeline.getFromProtoBuf(
+ HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
+ Preconditions.checkNotNull(pipeline);
+ addExistingPipeline(pipeline);
+ }
+ }
+
+ public Set<ContainerID> getOpenContainerIDsByPipeline(PipelineID pipelineID) {
+ return pipeline2ContainerMap.get(pipelineID);
+ }
+
+ public void addContainerToPipeline(PipelineID pipelineID, long containerID) {
+ pipeline2ContainerMap.get(pipelineID)
+ .add(ContainerID.valueof(containerID));
+ }
+
+ public void removeContainerFromPipeline(PipelineID pipelineID,
+ long containerID) throws IOException {
+ pipeline2ContainerMap.get(pipelineID)
+ .remove(ContainerID.valueof(containerID));
+ closePipelineIfNoOpenContainers(pipelineMap.get(pipelineID));
}
/**
@@ -294,8 +283,14 @@ public class PipelineSelector {
manager.createPipeline(replicationFactor, replicationType);
if (pipeline == null) {
// try to return a pipeline from already allocated pipelines
- pipeline = manager.getPipeline(replicationFactor, replicationType);
+ PipelineID pipelineId =
+ manager.getPipeline(replicationFactor, replicationType);
+ pipeline = pipelineMap.get(pipelineId);
+ Preconditions.checkArgument(pipeline.getLifeCycleState() ==
+ LifeCycleState.OPEN);
} else {
+ pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
+ pipeline.getProtobufMessage().toByteArray());
// if a new pipeline is created, initialize its state machine
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CREATE);
@@ -343,8 +338,8 @@ public class PipelineSelector {
if (pipeline.getLifeCycleState() != LifeCycleState.CLOSING) {
return;
}
- NavigableSet<ContainerID> containerIDS = containerStateManager
- .getMatchingContainerIDsByPipeline(pipeline.getId());
+ HashSet<ContainerID> containerIDS =
+ pipeline2ContainerMap.get(pipeline.getId());
if (containerIDS.size() == 0) {
updatePipelineState(pipeline, HddsProtos.LifeCycleEvent.CLOSE);
LOG.info("Closing pipeline. pipelineID: {}", pipeline.getId());
@@ -358,56 +353,58 @@ public class PipelineSelector {
PipelineManager manager = getPipelineManager(pipeline.getType());
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
LOG.debug("Closing pipeline. pipelineID: {}", pipeline.getId());
- NavigableSet<ContainerID> containers =
- containerStateManager
- .getMatchingContainerIDsByPipeline(pipeline.getId());
+ HashSet<ContainerID> containers =
+ pipeline2ContainerMap.get(pipeline.getId());
Preconditions.checkArgument(containers.size() == 0);
manager.closePipeline(pipeline);
}
- private void closeContainersByPipeline(Pipeline pipeline) {
- NavigableSet<ContainerID> containers =
- containerStateManager
- .getMatchingContainerIDsByPipeline(pipeline.getId());
- for (ContainerID id : containers) {
- eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id);
- }
- }
-
/**
- * list members in the pipeline .
+ * Add to a given pipeline.
*/
-
- public List<DatanodeDetails> getDatanodes(ReplicationType replicationType,
- PipelineID pipelineID) throws IOException {
- PipelineManager manager = getPipelineManager(replicationType);
- Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
- LOG.debug("Getting data nodes from pipeline : {}", pipelineID);
- return manager.getMembers(pipelineID);
- }
-
- /**
- * Update the datanodes in the list of the pipeline.
- */
-
- public void updateDatanodes(ReplicationType replicationType, PipelineID
- pipelineID, List<DatanodeDetails> newDatanodes) throws IOException {
- PipelineManager manager = getPipelineManager(replicationType);
+ private void addOpenPipeline(Pipeline pipeline) {
+ PipelineManager manager = getPipelineManager(pipeline.getType());
Preconditions.checkNotNull(manager, "Found invalid pipeline manager");
- LOG.debug("Updating pipeline: {} with new nodes:{}", pipelineID,
- newDatanodes.stream().map(DatanodeDetails::toString)
- .collect(Collectors.joining(",")));
- manager.updatePipeline(pipelineID, newDatanodes);
+ LOG.debug("Adding Open pipeline. pipelineID: {}", pipeline.getId());
+ manager.addOpenPipeline(pipeline);
}
- public Node2PipelineMap getNode2PipelineMap() {
- return node2PipelineMap;
+ private void closeContainersByPipeline(Pipeline pipeline) {
+ HashSet<ContainerID> containers =
+ pipeline2ContainerMap.get(pipeline.getId());
+ for (ContainerID id : containers) {
+ eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, id);
+ }
}
public Set<PipelineID> getPipelineId(UUID dnId) {
return node2PipelineMap.getPipelines(dnId);
}
+ private void addExistingPipeline(Pipeline pipeline) throws IOException {
+ LifeCycleState state = pipeline.getLifeCycleState();
+ switch (state) {
+ case ALLOCATED:
+ // a pipeline in allocated state is only present in SCM and does not exist
+ // on datanode, on SCM restart, this pipeline can be ignored.
+ break;
+ case CREATING:
+ case OPEN:
+ case CLOSING:
+ //TODO: process pipeline report and move pipeline to active queue
+ // when all the nodes have reported.
+ pipelineMap.put(pipeline.getId(), pipeline);
+ pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
+ node2PipelineMap.addPipeline(pipeline);
+ break;
+ case CLOSED:
+ // if the pipeline is in closed state, nothing to do.
+ break;
+ default:
+ throw new IOException("invalid pipeline state:" + state);
+ }
+ }
+
/**
* Update the Pipeline State to the next state.
*
@@ -417,24 +414,12 @@ public class PipelineSelector {
*/
public void updatePipelineState(Pipeline pipeline,
HddsProtos.LifeCycleEvent event) throws IOException {
- HddsProtos.LifeCycleState newState;
- try {
- newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
- } catch (InvalidStateTransitionException ex) {
- String error = String.format("Failed to update pipeline state %s, " +
- "reason: invalid state transition from state: %s upon " +
- "event: %s.",
- pipeline.getId(), pipeline.getLifeCycleState(), event);
- LOG.error(error);
- throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
- }
-
- // This is a post condition after executing getNextState.
- Preconditions.checkNotNull(newState);
- Preconditions.checkNotNull(pipeline);
try {
switch (event) {
case CREATE:
+ pipelineMap.put(pipeline.getId(), pipeline);
+ pipeline2ContainerMap.put(pipeline.getId(), new HashSet<>());
+ node2PipelineMap.addPipeline(pipeline);
// Acquire lease on pipeline
Lease<Pipeline> pipelineLease = pipelineLeaseManager.acquire(pipeline);
// Register callback to be executed in case of timeout
@@ -446,6 +431,7 @@ public class PipelineSelector {
case CREATED:
// Release the lease on pipeline
pipelineLeaseManager.release(pipeline);
+ addOpenPipeline(pipeline);
break;
case FINALIZE:
@@ -455,21 +441,30 @@ public class PipelineSelector {
case CLOSE:
case TIMEOUT:
closePipeline(pipeline);
+ pipeline2ContainerMap.remove(pipeline.getId());
+ node2PipelineMap.removePipeline(pipeline);
+ pipelineMap.remove(pipeline.getId());
break;
default:
throw new SCMException("Unsupported pipeline LifeCycleEvent.",
FAILED_TO_CHANGE_PIPELINE_STATE);
}
- pipeline.setLifeCycleState(newState);
+ stateManager.updatePipelineState(pipeline, event);
+ pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
+ pipeline.getProtobufMessage().toByteArray());
} catch (LeaseException e) {
throw new IOException("Lease Exception.", e);
}
}
- public void shutdown() {
+ public void shutdown() throws IOException {
if (pipelineLeaseManager != null) {
pipelineLeaseManager.shutdown();
}
+
+ if (pipelineStore != null) {
+ pipelineStore.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java
new file mode 100644
index 0000000..6054f16
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/PipelineStateManager.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipelines;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.ozone.common.statemachine
+ .InvalidStateTransitionException;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+ .FAILED_TO_CHANGE_PIPELINE_STATE;
+
+/**
+ * Manages Pipeline states.
+ */
+public class PipelineStateManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PipelineStateManager.class);
+
+ private final StateMachine<HddsProtos.LifeCycleState,
+ HddsProtos.LifeCycleEvent> stateMachine;
+
+ PipelineStateManager() {
+ // Initialize the container state machine.
+ Set<HddsProtos.LifeCycleState> finalStates = new HashSet<>();
+ // These are the steady states of a container.
+ finalStates.add(HddsProtos.LifeCycleState.OPEN);
+ finalStates.add(HddsProtos.LifeCycleState.CLOSED);
+
+ this.stateMachine = new StateMachine<>(HddsProtos.LifeCycleState.ALLOCATED,
+ finalStates);
+ initializeStateMachine();
+ }
+
+ /**
+ * Event and State Transition Mapping.
+ *
+ * State: ALLOCATED ---------------> CREATING
+ * Event: CREATE
+ *
+ * State: CREATING ---------------> OPEN
+ * Event: CREATED
+ *
+ * State: OPEN ---------------> CLOSING
+ * Event: FINALIZE
+ *
+ * State: CLOSING ---------------> CLOSED
+ * Event: CLOSE
+ *
+ * State: CREATING ---------------> CLOSED
+ * Event: TIMEOUT
+ *
+ *
+ * Container State Flow:
+ *
+ * [ALLOCATED]---->[CREATING]------>[OPEN]-------->[CLOSING]
+ * (CREATE) | (CREATED) (FINALIZE) |
+ * | |
+ * | |
+ * |(TIMEOUT) |(CLOSE)
+ * | |
+ * +--------> [CLOSED] <--------+
+ */
+ private void initializeStateMachine() {
+ stateMachine.addTransition(HddsProtos.LifeCycleState.ALLOCATED,
+ HddsProtos.LifeCycleState.CREATING,
+ HddsProtos.LifeCycleEvent.CREATE);
+
+ stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
+ HddsProtos.LifeCycleState.OPEN,
+ HddsProtos.LifeCycleEvent.CREATED);
+
+ stateMachine.addTransition(HddsProtos.LifeCycleState.OPEN,
+ HddsProtos.LifeCycleState.CLOSING,
+ HddsProtos.LifeCycleEvent.FINALIZE);
+
+ stateMachine.addTransition(HddsProtos.LifeCycleState.CLOSING,
+ HddsProtos.LifeCycleState.CLOSED,
+ HddsProtos.LifeCycleEvent.CLOSE);
+
+ stateMachine.addTransition(HddsProtos.LifeCycleState.CREATING,
+ HddsProtos.LifeCycleState.CLOSED,
+ HddsProtos.LifeCycleEvent.TIMEOUT);
+ }
+
+
+ /**
+ * Update the Pipeline State to the next state.
+ *
+ * @param pipeline - Pipeline
+ * @param event - LifeCycle Event
+ * @throws SCMException on Failure.
+ */
+ public void updatePipelineState(Pipeline pipeline,
+ HddsProtos.LifeCycleEvent event) throws IOException {
+ HddsProtos.LifeCycleState newState;
+ try {
+ newState = stateMachine.getNextState(pipeline.getLifeCycleState(), event);
+ } catch (InvalidStateTransitionException ex) {
+ String error = String.format("Failed to update pipeline state %s, " +
+ "reason: invalid state transition from state: %s upon " +
+ "event: %s.",
+ pipeline.getId(), pipeline.getLifeCycleState(), event);
+ LOG.error(error);
+ throw new SCMException(error, FAILED_TO_CHANGE_PIPELINE_STATE);
+ }
+
+ // This is a post condition after executing getNextState.
+ Preconditions.checkNotNull(newState);
+ Preconditions.checkNotNull(pipeline);
+ pipeline.setLifeCycleState(newState);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
index 0342e18..d3cec88 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/ratis/RatisManagerImpl.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -39,7 +38,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
-import java.util.Map;
/**
* Implementation of {@link PipelineManager}.
@@ -59,9 +57,8 @@ public class RatisManagerImpl extends PipelineManager {
* @param nodeManager
*/
public RatisManagerImpl(NodeManager nodeManager,
- ContainerPlacementPolicy placementPolicy, long size, Configuration conf,
- Node2PipelineMap map, Map<PipelineID, Pipeline> pipelineMap) {
- super(map, pipelineMap);
+ ContainerPlacementPolicy placementPolicy, long size, Configuration conf) {
+ super();
this.conf = conf;
this.nodeManager = nodeManager;
ratisMembers = new HashSet<>();
@@ -114,7 +111,6 @@ public class RatisManagerImpl extends PipelineManager {
XceiverClientRatis.newXceiverClientRatis(pipeline, conf)) {
client.destroyPipeline();
}
- super.closePipeline(pipeline);
for (DatanodeDetails node : pipeline.getMachines()) {
// A node should always be the in ratis members list.
Preconditions.checkArgument(ratisMembers.remove(node));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
index 2573b9c..ed2fc2f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipelines.Node2PipelineMap;
import org.apache.hadoop.hdds.scm.pipelines.PipelineManager;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -37,7 +36,6 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
-import java.util.Map;
/**
* Standalone Manager Impl to prove that pluggable interface
@@ -58,9 +56,8 @@ public class StandaloneManagerImpl extends PipelineManager {
* @param containerSize - Container Size.
*/
public StandaloneManagerImpl(NodeManager nodeManager,
- ContainerPlacementPolicy placementPolicy, long containerSize,
- Node2PipelineMap map, Map<PipelineID, Pipeline> pipelineMap) {
- super(map, pipelineMap);
+ ContainerPlacementPolicy placementPolicy, long containerSize) {
+ super();
this.nodeManager = nodeManager;
this.placementPolicy = placementPolicy;
this.containerSize = containerSize;
@@ -105,7 +102,6 @@ public class StandaloneManagerImpl extends PipelineManager {
* Close the pipeline.
*/
public void closePipeline(Pipeline pipeline) throws IOException {
- super.closePipeline(pipeline);
for (DatanodeDetails node : pipeline.getMachines()) {
// A node should always be the in standalone members list.
Preconditions.checkArgument(standAloneMembers.remove(node));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
index d74a32f..a59179b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerReportHandler.java
@@ -75,6 +75,7 @@ public class TestContainerReportHandler implements EventPublisher {
OzoneConfiguration conf = new OzoneConfiguration();
Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
Mapping mapping = Mockito.mock(Mapping.class);
+ PipelineSelector selector = Mockito.mock(PipelineSelector.class);
when(mapping.getContainer(anyLong()))
.thenAnswer(
@@ -87,7 +88,7 @@ public class TestContainerReportHandler implements EventPublisher {
);
ContainerStateManager containerStateManager =
- new ContainerStateManager(conf, mapping);
+ new ContainerStateManager(conf, mapping, selector);
when(mapping.getStateManager()).thenReturn(containerStateManager);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
index fe92ee5..b857740 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManager.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -41,7 +42,8 @@ public class TestContainerStateManager {
public void init() throws IOException {
OzoneConfiguration conf = new OzoneConfiguration();
Mapping mapping = Mockito.mock(Mapping.class);
- containerStateManager = new ContainerStateManager(conf, mapping);
+ PipelineSelector selector = Mockito.mock(PipelineSelector.class);
+ containerStateManager = new ContainerStateManager(conf, mapping, selector);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 0b69f5f..5ca9cb7 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.Node2ContainerMap;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.junit.Assert;
@@ -60,7 +61,8 @@ public class TestDeadNodeHandler {
Node2ContainerMap node2ContainerMap = new Node2ContainerMap();
ContainerStateManager containerStateManager = new ContainerStateManager(
new OzoneConfiguration(),
- Mockito.mock(Mapping.class)
+ Mockito.mock(Mapping.class),
+ Mockito.mock(PipelineSelector.class)
);
ContainerInfo container1 =
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
index c6e819b..422a7de 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerStateManagerIntegration.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.states.ContainerStateMap;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -54,8 +55,9 @@ public class TestContainerStateManagerIntegration {
private MiniOzoneCluster cluster;
private XceiverClientManager xceiverClientManager;
private StorageContainerManager scm;
- private Mapping scmContainerMapping;
+ private ContainerMapping scmContainerMapping;
private ContainerStateManager containerStateManager;
+ private PipelineSelector selector;
private String containerOwner = "OZONE";
@@ -66,8 +68,9 @@ public class TestContainerStateManagerIntegration {
cluster.waitForClusterToBeReady();
xceiverClientManager = new XceiverClientManager(conf);
scm = cluster.getStorageContainerManager();
- scmContainerMapping = scm.getScmContainerManager();
+ scmContainerMapping = (ContainerMapping) scm.getScmContainerManager();
containerStateManager = scmContainerMapping.getStateManager();
+ selector = scmContainerMapping.getPipelineSelector();
}
@After
@@ -133,8 +136,7 @@ public class TestContainerStateManagerIntegration {
// New instance of ContainerStateManager should load all the containers in
// container store.
ContainerStateManager stateManager =
- new ContainerStateManager(conf, scmContainerMapping
- );
+ new ContainerStateManager(conf, scmContainerMapping, selector);
int matchCount = stateManager
.getMatchingContainerIDs(containerOwner,
xceiverClientManager.getType(), xceiverClientManager.getFactor(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index b8cb9970..aefa6b0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -30,14 +30,13 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.MiniOzoneCluster;
-import org.junit.AfterClass;
+import org.junit.After;
import org.junit.Assert;
-import org.junit.BeforeClass;
+import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
-import java.util.NavigableSet;
import java.util.Set;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
@@ -60,8 +59,8 @@ public class TestNode2PipelineMap {
*
* @throws IOException
*/
- @BeforeClass
- public static void init() throws Exception {
+ @Before
+ public void init() throws Exception {
conf = new OzoneConfiguration();
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
cluster.waitForClusterToBeReady();
@@ -75,8 +74,8 @@ public class TestNode2PipelineMap {
/**
* Shutdown MiniDFSCluster.
*/
- @AfterClass
- public static void shutdown() {
+ @After
+ public void shutdown() {
if (cluster != null) {
cluster.shutdown();
}
@@ -86,19 +85,20 @@ public class TestNode2PipelineMap {
@Test
public void testPipelineMap() throws IOException {
- NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline(
+ Set<ContainerID> set = pipelineSelector.getOpenContainerIDsByPipeline(
ratisContainer.getPipeline().getId());
long cId = ratisContainer.getContainerInfo().getContainerID();
Assert.assertEquals(1, set.size());
- Assert.assertEquals(cId, set.first().getId());
+ set.forEach(containerID ->
+ Assert.assertEquals(containerID, ContainerID.valueof(cId)));
List<DatanodeDetails> dns = ratisContainer.getPipeline().getMachines();
Assert.assertEquals(3, dns.size());
// get pipeline details by dnid
Set<PipelineID> pipelines = mapping.getPipelineSelector()
- .getNode2PipelineMap().getPipelines(dns.get(0).getUuid());
+ .getPipelineId(dns.get(0).getUuid());
Assert.assertEquals(1, pipelines.size());
pipelines.forEach(p -> Assert.assertEquals(p,
ratisContainer.getPipeline().getId()));
@@ -114,7 +114,7 @@ public class TestNode2PipelineMap {
.updateContainerState(cId, HddsProtos.LifeCycleEvent.FINALIZE);
mapping
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
- NavigableSet<ContainerID> set2 = stateMap.getOpenContainerIDsByPipeline(
+ Set<ContainerID> set2 = pipelineSelector.getOpenContainerIDsByPipeline(
ratisContainer.getPipeline().getId());
Assert.assertEquals(0, set2.size());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index 0f8f925..a5828e1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -35,7 +35,7 @@ import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
-import java.util.NavigableSet;
+import java.util.Set;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos
@@ -88,12 +88,13 @@ public class TestPipelineClose {
@Test
public void testPipelineCloseWithClosedContainer() throws IOException {
- NavigableSet<ContainerID> set = stateMap.getOpenContainerIDsByPipeline(
+ Set<ContainerID> set = pipelineSelector.getOpenContainerIDsByPipeline(
ratisContainer1.getPipeline().getId());
long cId = ratisContainer1.getContainerInfo().getContainerID();
Assert.assertEquals(1, set.size());
- Assert.assertEquals(cId, set.first().getId());
+ set.forEach(containerID ->
+ Assert.assertEquals(containerID, ContainerID.valueof(cId)));
// Now close the container and it should not show up while fetching
// containers by pipeline
@@ -106,7 +107,7 @@ public class TestPipelineClose {
mapping
.updateContainerState(cId, HddsProtos.LifeCycleEvent.CLOSE);
- NavigableSet<ContainerID> setClosed = stateMap.getOpenContainerIDsByPipeline(
+ Set<ContainerID> setClosed = pipelineSelector.getOpenContainerIDsByPipeline(
ratisContainer1.getPipeline().getId());
Assert.assertEquals(0, setClosed.size());
@@ -118,15 +119,15 @@ public class TestPipelineClose {
HddsProtos.LifeCycleState.CLOSED);
for (DatanodeDetails dn : ratisContainer1.getPipeline().getMachines()) {
// Assert that the pipeline has been removed from Node2PipelineMap as well
- Assert.assertEquals(pipelineSelector.getNode2PipelineMap()
- .getPipelines(dn.getUuid()).size(), 0);
+ Assert.assertEquals(pipelineSelector.getPipelineId(
+ dn.getUuid()).size(), 0);
}
}
@Test
public void testPipelineCloseWithOpenContainer() throws IOException,
TimeoutException, InterruptedException {
- NavigableSet<ContainerID> setOpen = stateMap.getOpenContainerIDsByPipeline(
+ Set<ContainerID> setOpen = pipelineSelector.getOpenContainerIDsByPipeline(
ratisContainer2.getPipeline().getId());
Assert.assertEquals(1, setOpen.size());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
new file mode 100644
index 0000000..3999d76
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.scm.container.ContainerMapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.protocol.proto
+ .HddsProtos.ReplicationFactor.THREE;
+import static org.apache.hadoop.hdds.protocol.proto
+ .HddsProtos.ReplicationType.RATIS;
+
+/**
+ * Test SCM restart and recovery wrt pipelines.
+ */
+public class TestSCMRestart {
+
+ private static MiniOzoneCluster cluster;
+ private static OzoneConfiguration conf;
+ private static Pipeline ratisPipeline1;
+ private static Pipeline ratisPipeline2;
+ private static ContainerMapping mapping;
+ private static ContainerMapping newMapping;
+
+ /**
+ * Create a MiniDFSCluster for testing.
+ *
+ * @throws IOException
+ */
+ @BeforeClass
+ public static void init() throws Exception {
+ conf = new OzoneConfiguration();
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(6)
+ .setHbInterval(1000)
+ .setHbProcessorInterval(1000)
+ .build();
+ cluster.waitForClusterToBeReady();
+ StorageContainerManager scm = cluster.getStorageContainerManager();
+ mapping = (ContainerMapping)scm.getScmContainerManager();
+ ratisPipeline1 =
+ mapping.allocateContainer(RATIS, THREE, "Owner1").getPipeline();
+ ratisPipeline2 =
+ mapping.allocateContainer(RATIS, ONE, "Owner2").getPipeline();
+ // At this stage, there should be 2 pipeline one with 1 open container
+ // each. Try restarting the SCM and then discover that pipeline are in
+ // correct state.
+ cluster.restartStorageContainerManager();
+ newMapping = (ContainerMapping)(cluster.getStorageContainerManager()
+ .getScmContainerManager());
+ }
+
+ /**
+ * Shutdown MiniDFSCluster.
+ */
+ @AfterClass
+ public static void shutdown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testPipelineWithScmRestart() {
+ // After restart make sure that the pipeline are still present
+ Pipeline ratisPipeline1AfterRestart = newMapping.getPipelineSelector()
+ .getPipeline(ratisPipeline1.getId());
+ Pipeline ratisPipeline2AfterRestart = newMapping.getPipelineSelector()
+ .getPipeline(ratisPipeline2.getId());
+ Assert.assertNotSame(ratisPipeline1AfterRestart, ratisPipeline1);
+ Assert.assertNotSame(ratisPipeline2AfterRestart, ratisPipeline2);
+ Assert.assertEquals(ratisPipeline1AfterRestart, ratisPipeline1);
+ Assert.assertEquals(ratisPipeline2AfterRestart, ratisPipeline2);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 3cba839..d13efb4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -138,8 +138,11 @@ public interface MiniOzoneCluster {
* Restarts StorageContainerManager instance.
*
* @throws IOException
+ * @throws TimeoutException
+ * @throws InterruptedException
*/
- void restartStorageContainerManager() throws IOException;
+ void restartStorageContainerManager() throws InterruptedException,
+ TimeoutException, IOException;
/**
* Restarts OzoneManager instance.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/84693669/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index c2169a3..b34a7d1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -85,7 +85,7 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
LoggerFactory.getLogger(MiniOzoneClusterImpl.class);
private final OzoneConfiguration conf;
- private final StorageContainerManager scm;
+ private StorageContainerManager scm;
private final OzoneManager ozoneManager;
private final List<HddsDatanodeService> hddsDatanodes;
@@ -215,9 +215,13 @@ public final class MiniOzoneClusterImpl implements MiniOzoneCluster {
}
@Override
- public void restartStorageContainerManager() throws IOException {
+ public void restartStorageContainerManager()
+ throws TimeoutException, InterruptedException, IOException {
scm.stop();
+ scm.join();
+ scm = StorageContainerManager.createSCM(null, conf);
scm.start();
+ waitForClusterToBeReady();
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org