You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by xy...@apache.org on 2020/07/10 22:45:33 UTC
[hadoop-ozone] branch HDDS-2823 updated: HDDS-3662 Decouple
finalizeAndDestroyPipeline. (#1049)
This is an automated email from the ASF dual-hosted git repository.
xyao pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new 565dabc HDDS-3662 Decouple finalizeAndDestroyPipeline. (#1049)
565dabc is described below
commit 565dabc9237a646a8ddb9c9aa4784c99ed7a56c0
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Sat Jul 11 06:45:21 2020 +0800
HDDS-3662 Decouple finalizeAndDestroyPipeline. (#1049)
---
.../hadoop/hdds/scm/node/DeadNodeHandler.java | 2 +-
.../hadoop/hdds/scm/node/StaleNodeHandler.java | 2 +-
.../hdds/scm/pipeline/PipelineActionHandler.java | 4 +-
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 3 +-
.../hdds/scm/pipeline/PipelineManagerV2Impl.java | 160 +++++++++++----------
.../hdds/scm/pipeline/SCMPipelineManager.java | 141 +++++++++---------
.../hdds/scm/server/SCMClientProtocolServer.java | 2 +-
.../hadoop/hdds/scm/block/TestBlockManager.java | 2 +-
.../hdds/scm/pipeline/MockPipelineManager.java | 3 +-
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 6 +-
.../hdds/scm/pipeline/TestSCMPipelineManager.java | 6 +-
.../hdds/scm/pipeline/TestNode2PipelineMap.java | 2 +-
.../hdds/scm/pipeline/TestPipelineClose.java | 4 +-
.../TestRatisPipelineCreateAndDestroy.java | 6 +-
.../apache/hadoop/ozone/container/TestHelper.java | 4 +-
.../TestCloseContainerByPipeline.java | 4 +-
.../ozone/freon/TestFreonWithDatanodeRestart.java | 1 +
.../ozone/freon/TestFreonWithPipelineDestroy.java | 2 +-
.../hadoop/ozone/recon/TestReconAsPassiveScm.java | 2 +-
.../TestSCMContainerPlacementPolicyMetrics.java | 2 +-
.../ozone/scm/pipeline/TestSCMPipelineMetrics.java | 3 +-
.../ozone/recon/scm/ReconPipelineManager.java | 9 +-
22 files changed, 180 insertions(+), 190 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index 17e1fed..fde2286 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -98,7 +98,7 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
.ifPresent(pipelines ->
pipelines.forEach(id -> {
try {
- pipelineManager.finalizeAndDestroyPipeline(
+ pipelineManager.closePipeline(
pipelineManager.getPipeline(id), false);
} catch (PipelineNotFoundException ignore) {
// Pipeline is not there in pipeline manager,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
index 5530e73..dd8cea3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
@@ -60,7 +60,7 @@ public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
for (PipelineID pipelineID : pipelineIds) {
try {
Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
- pipelineManager.finalizeAndDestroyPipeline(pipeline, true);
+ pipelineManager.closePipeline(pipeline, true);
} catch (IOException e) {
LOG.info("Could not finalize pipeline={} for dn={}", pipelineID,
datanodeDetails);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
index 0720694..e719adb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
@@ -79,8 +79,8 @@ public class PipelineActionHandler
info.getDetailedReason());
if (action == PipelineAction.Action.CLOSE) {
- pipelineManager.finalizeAndDestroyPipeline(
- pipelineManager.getPipeline(pid), true);
+ pipelineManager.closePipeline(
+ pipelineManager.getPipeline(pid), false);
} else {
LOG.error("unknown pipeline action:{}", action);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 48068d8..02e195f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -77,8 +77,7 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean,
void openPipeline(PipelineID pipelineId) throws IOException;
- void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
- throws IOException;
+ void closePipeline(Pipeline pipeline, boolean onTimeout) throws IOException;
void scrubPipeline(ReplicationType type, ReplicationFactor factor)
throws IOException;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 3732add..85654aa 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
@@ -47,11 +48,11 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
+import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
/**
* SCM Pipeline Manager implementation.
@@ -68,6 +69,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
private Scheduler scheduler;
private BackgroundPipelineCreator backgroundPipelineCreator;
private final ConfigurationSource conf;
+ private final EventPublisher eventPublisher;
// Pipeline Manager MXBean
private ObjectName pmInfoBean;
private final SCMPipelineMetrics metrics;
@@ -80,11 +82,13 @@ public final class PipelineManagerV2Impl implements PipelineManager {
private PipelineManagerV2Impl(ConfigurationSource conf,
NodeManager nodeManager,
StateManager pipelineStateManager,
- PipelineFactory pipelineFactory) {
+ PipelineFactory pipelineFactory,
+ EventPublisher eventPublisher) {
this.lock = new ReentrantReadWriteLock();
this.pipelineFactory = pipelineFactory;
this.stateManager = pipelineStateManager;
this.conf = conf;
+ this.eventPublisher = eventPublisher;
this.pmInfoBean = MBeans.register("SCMPipelineManager",
"SCMPipelineManagerInfo", this);
this.metrics = SCMPipelineMetrics.create();
@@ -116,7 +120,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
nodeManager, stateManager, conf, eventPublisher);
// Create PipelineManager
PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf,
- nodeManager, stateManager, pipelineFactory);
+ nodeManager, stateManager, pipelineFactory, eventPublisher);
// Create background thread.
Scheduler scheduler = new Scheduler(
@@ -157,7 +161,14 @@ public final class PipelineManagerV2Impl implements PipelineManager {
@Override
public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
List<DatanodeDetails> nodes) {
- return null;
+ // This will mostly be used to create dummy pipeline for SimplePipelines.
+ // We don't update the metrics for SimplePipelines.
+ lock.writeLock().lock();
+ try {
+ return pipelineFactory.create(type, factor, nodes);
+ } finally {
+ lock.writeLock().unlock();
+ }
}
@Override
@@ -310,97 +321,77 @@ public final class PipelineManagerV2Impl implements PipelineManager {
}
/**
- * Finalizes pipeline in the SCM. Removes pipeline and makes rpc call to
- * destroy pipeline on the datanodes immediately or after timeout based on the
- * value of onTimeout parameter.
- *
- * @param pipeline - Pipeline to be destroyed
- * @param onTimeout - if true pipeline is removed and destroyed on
- * datanodes after timeout
- * @throws IOException
- */
- @Override
- public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
- throws IOException {
- LOG.info("Destroying pipeline:{}", pipeline);
- finalizePipeline(pipeline.getId());
- if (onTimeout) {
- long pipelineDestroyTimeoutInMillis =
- conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
- ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
- TimeUnit.MILLISECONDS);
- scheduler.schedule(() -> destroyPipeline(pipeline),
- pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG,
- String.format("Destroy pipeline failed for pipeline:%s", pipeline));
- } else {
- destroyPipeline(pipeline);
- }
- }
-
- /**
- * Moves the pipeline to CLOSED state and sends close container command for
- * all the containers in the pipeline.
+ * Removes the pipeline from the db and pipeline state map.
*
- * @param pipelineId - ID of the pipeline to be moved to CLOSED state.
+ * @param pipeline - pipeline to be removed
* @throws IOException
*/
- private void finalizePipeline(PipelineID pipelineId) throws IOException {
+ protected void removePipeline(Pipeline pipeline) throws IOException {
+ pipelineFactory.close(pipeline.getType(), pipeline);
+ PipelineID pipelineID = pipeline.getId();
lock.writeLock().lock();
try {
- Pipeline pipeline = stateManager.getPipeline(pipelineId);
- if (!pipeline.isClosed()) {
- stateManager.updatePipelineState(
- pipelineId.getProtobuf(), HddsProtos.PipelineState.PIPELINE_CLOSED);
- LOG.info("Pipeline {} moved to CLOSED state", pipeline);
- }
-
- // TODO fire events to datanodes for closing pipelines
-// Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
-// for (ContainerID containerID : containerIDs) {
-// eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
-// }
- metrics.removePipelineMetrics(pipelineId);
+ stateManager.removePipeline(pipelineID.getProtobuf());
+ metrics.incNumPipelineDestroyed();
+ } catch (IOException ex) {
+ metrics.incNumPipelineDestroyFailed();
+ throw ex;
} finally {
lock.writeLock().unlock();
}
}
/**
- * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
- * the datanodes for ratis pipelines.
- *
- * @param pipeline - Pipeline to be destroyed
+ * Fire events to close all containers related to the input pipeline.
+ * @param pipelineId - ID of the pipeline.
* @throws IOException
*/
- protected void destroyPipeline(Pipeline pipeline) throws IOException {
- pipelineFactory.close(pipeline.getType(), pipeline);
- // remove the pipeline from the pipeline manager
- removePipeline(pipeline.getId());
- triggerPipelineCreation();
+ protected void closeContainersForPipeline(final PipelineID pipelineId)
+ throws IOException {
+ Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
+ for (ContainerID containerID : containerIDs) {
+ eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
+ }
}
/**
- * Removes the pipeline from the db and pipeline state map.
- *
- * @param pipelineId - ID of the pipeline to be removed
+ * put pipeline in CLOSED state.
+ * @param pipeline - ID of the pipeline.
+ * @param onTimeout - whether to remove pipeline after some time.
* @throws IOException
*/
- protected void removePipeline(PipelineID pipelineId) throws IOException {
+ @Override
+ public void closePipeline(Pipeline pipeline, boolean onTimeout)
+ throws IOException {
+ PipelineID pipelineID = pipeline.getId();
lock.writeLock().lock();
try {
- stateManager.removePipeline(pipelineId.getProtobuf());
- metrics.incNumPipelineDestroyed();
- } catch (IOException ex) {
- metrics.incNumPipelineDestroyFailed();
- throw ex;
+ if (!pipeline.isClosed()) {
+ stateManager.updatePipelineState(pipelineID.getProtobuf(),
+ HddsProtos.PipelineState.PIPELINE_CLOSED);
+ LOG.info("Pipeline {} moved to CLOSED state", pipeline);
+ }
+ metrics.removePipelineMetrics(pipelineID);
} finally {
lock.writeLock().unlock();
}
+ // close containers.
+ closeContainersForPipeline(pipelineID);
+ if (!onTimeout) {
+ // close pipeline right away.
+ removePipeline(pipeline);
+ }
}
+ /**
+ * Scrub pipelines.
+ * @param type Pipeline type
+ * @param factor Pipeline factor
+ * @throws IOException
+ */
@Override
public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
- throws IOException{
+ throws IOException {
if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) {
// Only srub pipeline for RATIS THREE pipeline
return;
@@ -410,18 +401,29 @@ public final class PipelineManagerV2Impl implements PipelineManager {
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
- List<Pipeline> needToSrubPipelines = stateManager.getPipelines(type, factor,
- Pipeline.PipelineState.ALLOCATED).stream()
- .filter(p -> currentTime.toEpochMilli() - p.getCreationTimestamp()
- .toEpochMilli() >= pipelineScrubTimeoutInMills)
- .collect(Collectors.toList());
- for (Pipeline p : needToSrubPipelines) {
- LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
- " since it stays at ALLOCATED stage for " +
- Duration.between(currentTime, p.getCreationTimestamp()).toMinutes() +
- " mins.");
- finalizeAndDestroyPipeline(p, false);
+
+ List<Pipeline> candidates = stateManager.getPipelines(type, factor);
+
+ for (Pipeline p : candidates) {
+ // scrub pipelines who stay ALLOCATED for too long.
+ if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
+ (currentTime.toEpochMilli() - p.getCreationTimestamp()
+ .toEpochMilli() >= pipelineScrubTimeoutInMills)) {
+ LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
+ " since it stays at ALLOCATED stage for " +
+ Duration.between(currentTime, p.getCreationTimestamp())
+ .toMinutes() + " mins.");
+ closePipeline(p, false);
+ }
+ // scrub pipelines who stay CLOSED for too long.
+ if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) {
+ LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
+ " since it stays at CLOSED stage.");
+ closeContainersForPipeline(p.getId());
+ removePipeline(p);
+ }
}
+ return;
}
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 50754db..365d8ee 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -382,33 +381,53 @@ public class SCMPipelineManager implements PipelineManager {
}
/**
- * Finalizes pipeline in the SCM. Removes pipeline and makes rpc call to
- * destroy pipeline on the datanodes immediately or after timeout based on the
- * value of onTimeout parameter.
- *
- * @param pipeline - Pipeline to be destroyed
- * @param onTimeout - if true pipeline is removed and destroyed on
- * datanodes after timeout
+ * Fire events to close all containers related to the input pipeline.
+ * @param pipelineId - ID of the pipeline.
+ * @throws IOException
+ */
+ protected void closeContainersForPipeline(final PipelineID pipelineId)
+ throws IOException {
+ Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
+ for (ContainerID containerID : containerIDs) {
+ eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
+ }
+ }
+
+ /**
+ * put pipeline in CLOSED state.
+ * @param pipeline - ID of the pipeline.
+ * @param onTimeout - whether to remove pipeline after some time.
* @throws IOException
*/
@Override
- public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
+ public void closePipeline(Pipeline pipeline, boolean onTimeout)
throws IOException {
- LOG.info("Destroying pipeline:{}", pipeline);
- finalizePipeline(pipeline.getId());
- if (onTimeout) {
- long pipelineDestroyTimeoutInMillis =
- conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
- ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
- TimeUnit.MILLISECONDS);
- scheduler.schedule(() -> destroyPipeline(pipeline),
- pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG,
- String.format("Destroy pipeline failed for pipeline:%s", pipeline));
- } else {
- destroyPipeline(pipeline);
+ PipelineID pipelineID = pipeline.getId();
+ lock.writeLock().lock();
+ try {
+ if (!pipeline.isClosed()) {
+ stateManager.updatePipelineState(pipelineID,
+ Pipeline.PipelineState.CLOSED);
+ LOG.info("Pipeline {} moved to CLOSED state", pipeline);
+ }
+ metrics.removePipelineMetrics(pipelineID);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ // close containers.
+ closeContainersForPipeline(pipelineID);
+ if (!onTimeout) {
+ // close pipeline right away.
+ removePipeline(pipeline);
}
}
+ /**
+ * Scrub pipelines.
+ * @param type Pipeline type
+ * @param factor Pipeline factor
+ * @throws IOException
+ */
@Override
public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
throws IOException{
@@ -421,18 +440,29 @@ public class SCMPipelineManager implements PipelineManager {
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
- List<Pipeline> needToSrubPipelines = stateManager.getPipelines(type, factor,
- Pipeline.PipelineState.ALLOCATED).stream()
- .filter(p -> currentTime.toEpochMilli() - p.getCreationTimestamp()
- .toEpochMilli() >= pipelineScrubTimeoutInMills)
- .collect(Collectors.toList());
- for (Pipeline p : needToSrubPipelines) {
- LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
- " since it stays at ALLOCATED stage for " +
- Duration.between(currentTime, p.getCreationTimestamp()).toMinutes() +
- " mins.");
- finalizeAndDestroyPipeline(p, false);
+
+ List<Pipeline> candidates = stateManager.getPipelines(type, factor);
+
+ for (Pipeline p : candidates) {
+ // scrub pipelines who stay ALLOCATED for too long.
+ if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
+ (currentTime.toEpochMilli() - p.getCreationTimestamp()
+ .toEpochMilli() >= pipelineScrubTimeoutInMills)) {
+ LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
+ " since it stays at ALLOCATED stage for " +
+ Duration.between(currentTime, p.getCreationTimestamp())
+ .toMinutes() + " mins.");
+ closePipeline(p, false);
+ }
+ // scrub pipelines who stay CLOSED for too long.
+ if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) {
+ LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
+ " since it is at CLOSED stage.");
+ closeContainersForPipeline(p.getId());
+ removePipeline(p);
+ }
}
+ return;
}
@Override
@@ -528,53 +558,20 @@ public class SCMPipelineManager implements PipelineManager {
}
/**
- * Moves the pipeline to CLOSED state and sends close container command for
- * all the containers in the pipeline.
- *
- * @param pipelineId - ID of the pipeline to be moved to CLOSED state.
- * @throws IOException
- */
- private void finalizePipeline(PipelineID pipelineId) throws IOException {
- lock.writeLock().lock();
- try {
- stateManager.finalizePipeline(pipelineId);
- Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
- for (ContainerID containerID : containerIDs) {
- eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
- }
- metrics.removePipelineMetrics(pipelineId);
- } finally {
- lock.writeLock().unlock();
- }
- }
-
- /**
- * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
- * the datanodes for ratis pipelines.
- *
- * @param pipeline - Pipeline to be destroyed
- * @throws IOException
- */
- protected void destroyPipeline(Pipeline pipeline) throws IOException {
- pipelineFactory.close(pipeline.getType(), pipeline);
- // remove the pipeline from the pipeline manager
- removePipeline(pipeline.getId());
- triggerPipelineCreation();
- }
-
- /**
* Removes the pipeline from the db and pipeline state map.
*
- * @param pipelineId - ID of the pipeline to be removed
+ * @param pipeline - pipeline to be removed
* @throws IOException
*/
- protected void removePipeline(PipelineID pipelineId) throws IOException {
+ protected void removePipeline(Pipeline pipeline) throws IOException {
+ pipelineFactory.close(pipeline.getType(), pipeline);
+ PipelineID pipelineID = pipeline.getId();
lock.writeLock().lock();
try {
if (pipelineStore != null) {
- pipelineStore.delete(pipelineId);
- Pipeline pipeline = stateManager.removePipeline(pipelineId);
- nodeManager.removePipeline(pipeline);
+ pipelineStore.delete(pipelineID);
+ Pipeline pipelineRemoved = stateManager.removePipeline(pipelineID);
+ nodeManager.removePipeline(pipelineRemoved);
metrics.incNumPipelineDestroyed();
}
} catch (IOException ex) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index df5c147..ede679d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -475,7 +475,7 @@ public class SCMClientProtocolServer implements
PipelineManager pipelineManager = scm.getPipelineManager();
Pipeline pipeline =
pipelineManager.getPipeline(PipelineID.getFromProtobuf(pipelineID));
- pipelineManager.finalizeAndDestroyPipeline(pipeline, true);
+ pipelineManager.closePipeline(pipeline, true);
AUDIT.logWriteSuccess(
buildAuditMessageForSuccess(SCMAction.CLOSE_PIPELINE, null)
);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index c89065c..cf6f0ed 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -417,7 +417,7 @@ public class TestBlockManager {
public void testBlockAllocationWithNoAvailablePipelines()
throws IOException, TimeoutException, InterruptedException {
for (Pipeline pipeline : pipelineManager.getPipelines()) {
- pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+ pipelineManager.closePipeline(pipeline, false);
}
Assert.assertEquals(0, pipelineManager.getPipelines(type, factor).size());
Assert.assertNotNull(blockManager
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 5dd6082..6292ad4 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -165,8 +165,7 @@ public final class MockPipelineManager implements PipelineManager {
}
@Override
- public void finalizeAndDestroyPipeline(final Pipeline pipeline,
- final boolean onTimeout)
+ public void closePipeline(final Pipeline pipeline, final boolean onTimeout)
throws IOException {
stateManager.finalizePipeline(pipeline.getId());
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index f8eeb6e..d5292e3 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -185,7 +185,7 @@ public class TestPipelineManagerImpl {
Pipeline.PipelineState.OPEN).contains(pipeline));
try {
- pipelineManager.removePipeline(pipeline.getId());
+ pipelineManager.removePipeline(pipeline);
fail();
} catch (IOException ioe) {
// Should not be able to remove the OPEN pipeline.
@@ -195,7 +195,7 @@ public class TestPipelineManagerImpl {
}
// Destroy pipeline
- pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+ pipelineManager.closePipeline(pipeline, false);
try {
pipelineManager.getPipeline(pipeline.getId());
fail("Pipeline should not have been retrieved");
@@ -238,7 +238,7 @@ public class TestPipelineManagerImpl {
.assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
// close the pipeline
- pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+ pipelineManager.closePipeline(pipeline, false);
// pipeline report for destroyed pipeline should be ignored
nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 7c2f17e..08c1a20 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -140,7 +140,7 @@ public class TestSCMPipelineManager {
// clean up
for (Pipeline pipeline : pipelines) {
- pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+ pipelineManager.closePipeline(pipeline, false);
}
pipelineManager.close();
}
@@ -163,7 +163,7 @@ public class TestSCMPipelineManager {
pipelineManager.openPipeline(pipeline.getId());
pipelineManager
.addContainerToPipeline(pipeline.getId(), ContainerID.valueof(1));
- pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+ pipelineManager.closePipeline(pipeline, false);
pipelineManager.close();
// new pipeline manager should not be able to load removed pipelines
@@ -228,7 +228,7 @@ public class TestSCMPipelineManager {
.assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
// close the pipeline
- pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+ pipelineManager.closePipeline(pipeline, false);
// pipeline report for destroyed pipeline should be ignored
nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index 42acb12..532f400 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -121,7 +121,7 @@ public class TestNode2PipelineMap {
Assert.assertEquals(0, set2.size());
pipelineManager
- .finalizeAndDestroyPipeline(ratisContainer.getPipeline(), false);
+ .closePipeline(ratisContainer.getPipeline(), false);
pipelines = scm.getScmNodeManager()
.getPipelines(dns.get(0));
Assert
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index 346fc0e..0ee0101 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -137,7 +137,7 @@ public class TestPipelineClose {
Assert.assertEquals(0, setClosed.size());
pipelineManager
- .finalizeAndDestroyPipeline(ratisContainer.getPipeline(), false);
+ .closePipeline(ratisContainer.getPipeline(), false);
for (DatanodeDetails dn : ratisContainer.getPipeline().getNodes()) {
// Assert that the pipeline has been removed from Node2PipelineMap as well
Assert.assertFalse(scm.getScmNodeManager().getPipelines(dn)
@@ -153,7 +153,7 @@ public class TestPipelineClose {
Assert.assertEquals(1, setOpen.size());
pipelineManager
- .finalizeAndDestroyPipeline(ratisContainer.getPipeline(), false);
+ .closePipeline(ratisContainer.getPipeline(), false);
GenericTestUtils.waitFor(() -> {
try {
return containerManager
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
index bd677db..08a29f2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
@@ -86,7 +86,7 @@ public class TestRatisPipelineCreateAndDestroy {
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN);
for (Pipeline pipeline : pipelines) {
- pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+ pipelineManager.closePipeline(pipeline, false);
}
// make sure two pipelines are created
waitForPipelines(2);
@@ -108,7 +108,7 @@ public class TestRatisPipelineCreateAndDestroy {
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN);
for (Pipeline pipeline : pipelines) {
- pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+ pipelineManager.closePipeline(pipeline, false);
}
// make sure two pipelines are created
@@ -152,7 +152,7 @@ public class TestRatisPipelineCreateAndDestroy {
// destroy the existing pipelines
for (Pipeline pipeline : pipelines) {
- pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+ pipelineManager.closePipeline(pipeline, false);
}
if (cluster.getStorageContainerManager()
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index 12ffce6..fab2ea3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@ -206,8 +206,8 @@ public final class TestHelper {
throws TimeoutException, InterruptedException, IOException {
for (Pipeline pipeline1 : pipelineList) {
// issue pipeline destroy command
- cluster.getStorageContainerManager().getPipelineManager()
- .finalizeAndDestroyPipeline(pipeline1, false);
+ cluster.getStorageContainerManager()
+ .getPipelineManager().closePipeline(pipeline1, false);
}
// wait for the pipeline to get destroyed in the datanodes
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index bb2d57f..6b40179 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -204,7 +204,7 @@ public class TestCloseContainerByPipeline {
Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails));
cluster.getStorageContainerManager().getPipelineManager()
- .finalizeAndDestroyPipeline(pipeline, false);
+ .closePipeline(pipeline, false);
Thread.sleep(5000);
// Pipeline close should not affect a container in CLOSED state
Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails));
@@ -307,7 +307,7 @@ public class TestCloseContainerByPipeline {
// close the pipeline
cluster.getStorageContainerManager()
- .getPipelineManager().finalizeAndDestroyPipeline(pipeline, false);
+ .getPipelineManager().closePipeline(pipeline, false);
// All the containers in OPEN or CLOSING state should transition to
// QUASI-CLOSED after pipeline close
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java
index fa25361..feb03ec 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java
@@ -85,6 +85,7 @@ public class TestFreonWithDatanodeRestart {
RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
"watch.request.timeout",
3, TimeUnit.SECONDS);
+ conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 5);
cluster = MiniOzoneCluster.newBuilder(conf)
.setHbProcessorInterval(1000)
.setHbInterval(1000)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
index 5150fd4..109a49e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
@@ -105,6 +105,6 @@ public class TestFreonWithPipelineDestroy {
PipelineManager pipelineManager =
cluster.getStorageContainerManager().getPipelineManager();
Pipeline pipeline = pipelineManager.getPipeline(id);
- pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+ pipelineManager.closePipeline(pipeline, false);
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java
index 6e3dfe3..9092cc5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java
@@ -181,7 +181,7 @@ public class TestReconAsPassiveScm {
.filter(p -> !p.getId().equals(containerInfo.getPipelineID()))
.findFirst();
assertTrue(pipelineToClose.isPresent());
- scmPipelineManager.finalizeAndDestroyPipeline(pipelineToClose.get(), false);
+ scmPipelineManager.closePipeline(pipelineToClose.get(), false);
// Start Recon
cluster.startRecon();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
index 4025aca..5edd392 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
@@ -124,7 +124,7 @@ public class TestSCMContainerPlacementPolicyMetrics {
.collect(Collectors.toList());
Pipeline targetPipeline = pipelines.get(0);
List<DatanodeDetails> nodes = targetPipeline.getNodes();
- manager.finalizeAndDestroyPipeline(pipelines.get(0), true);
+ manager.closePipeline(pipelines.get(0), true);
// kill datanode to trigger under-replicated container replication
cluster.shutdownHddsDatanode(nodes.get(0));
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java
index 250a2b0..a1a816d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java
@@ -92,8 +92,7 @@ public class TestSCMPipelineMetrics {
try {
cluster.getStorageContainerManager()
.getPipelineManager()
- .finalizeAndDestroyPipeline(
- pipeline.get(), false);
+ .closePipeline(pipeline.get(), false);
} catch (IOException e) {
e.printStackTrace();
Assert.fail();
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
index beed591..a96212d 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
@@ -62,13 +62,6 @@ public class ReconPipelineManager extends SCMPipelineManager {
// Don't do anything in Recon.
}
- @Override
- protected void destroyPipeline(Pipeline pipeline) throws IOException {
- // remove the pipeline from the pipeline manager
- removePipeline(pipeline.getId());
- }
-
-
/**
* Bootstrap Recon's pipeline metadata with that from SCM.
* @param pipelinesFromScm pipelines from SCM.
@@ -124,7 +117,7 @@ public class ReconPipelineManager extends SCMPipelineManager {
}
try {
LOG.info("Removing invalid pipeline {} from Recon.", pipelineID);
- finalizeAndDestroyPipeline(p, false);
+ closePipeline(p, false);
} catch (IOException e) {
LOG.warn("Unable to remove pipeline {}", pipelineID, e);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org