You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by xy...@apache.org on 2020/07/10 22:45:33 UTC

[hadoop-ozone] branch HDDS-2823 updated: HDDS-3662 Decouple finalizeAndDestroyPipeline. (#1049)

This is an automated email from the ASF dual-hosted git repository.

xyao pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/HDDS-2823 by this push:
     new 565dabc  HDDS-3662 Decouple finalizeAndDestroyPipeline. (#1049)
565dabc is described below

commit 565dabc9237a646a8ddb9c9aa4784c99ed7a56c0
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Sat Jul 11 06:45:21 2020 +0800

    HDDS-3662 Decouple finalizeAndDestroyPipeline. (#1049)
---
 .../hadoop/hdds/scm/node/DeadNodeHandler.java      |   2 +-
 .../hadoop/hdds/scm/node/StaleNodeHandler.java     |   2 +-
 .../hdds/scm/pipeline/PipelineActionHandler.java   |   4 +-
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |   3 +-
 .../hdds/scm/pipeline/PipelineManagerV2Impl.java   | 160 +++++++++++----------
 .../hdds/scm/pipeline/SCMPipelineManager.java      | 141 +++++++++---------
 .../hdds/scm/server/SCMClientProtocolServer.java   |   2 +-
 .../hadoop/hdds/scm/block/TestBlockManager.java    |   2 +-
 .../hdds/scm/pipeline/MockPipelineManager.java     |   3 +-
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java |   6 +-
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  |   6 +-
 .../hdds/scm/pipeline/TestNode2PipelineMap.java    |   2 +-
 .../hdds/scm/pipeline/TestPipelineClose.java       |   4 +-
 .../TestRatisPipelineCreateAndDestroy.java         |   6 +-
 .../apache/hadoop/ozone/container/TestHelper.java  |   4 +-
 .../TestCloseContainerByPipeline.java              |   4 +-
 .../ozone/freon/TestFreonWithDatanodeRestart.java  |   1 +
 .../ozone/freon/TestFreonWithPipelineDestroy.java  |   2 +-
 .../hadoop/ozone/recon/TestReconAsPassiveScm.java  |   2 +-
 .../TestSCMContainerPlacementPolicyMetrics.java    |   2 +-
 .../ozone/scm/pipeline/TestSCMPipelineMetrics.java |   3 +-
 .../ozone/recon/scm/ReconPipelineManager.java      |   9 +-
 22 files changed, 180 insertions(+), 190 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index 17e1fed..fde2286 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -98,7 +98,7 @@ public class DeadNodeHandler implements EventHandler<DatanodeDetails> {
         .ifPresent(pipelines ->
             pipelines.forEach(id -> {
               try {
-                pipelineManager.finalizeAndDestroyPipeline(
+                pipelineManager.closePipeline(
                     pipelineManager.getPipeline(id), false);
               } catch (PipelineNotFoundException ignore) {
                 // Pipeline is not there in pipeline manager,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
index 5530e73..dd8cea3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/StaleNodeHandler.java
@@ -60,7 +60,7 @@ public class StaleNodeHandler implements EventHandler<DatanodeDetails> {
     for (PipelineID pipelineID : pipelineIds) {
       try {
         Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
-        pipelineManager.finalizeAndDestroyPipeline(pipeline, true);
+        pipelineManager.closePipeline(pipeline, true);
       } catch (IOException e) {
         LOG.info("Could not finalize pipeline={} for dn={}", pipelineID,
             datanodeDetails);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
index 0720694..e719adb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
@@ -79,8 +79,8 @@ public class PipelineActionHandler
           info.getDetailedReason());
 
       if (action == PipelineAction.Action.CLOSE) {
-        pipelineManager.finalizeAndDestroyPipeline(
-            pipelineManager.getPipeline(pid), true);
+        pipelineManager.closePipeline(
+            pipelineManager.getPipeline(pid), false);
       } else {
         LOG.error("unknown pipeline action:{}", action);
       }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 48068d8..02e195f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -77,8 +77,7 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean,
 
   void openPipeline(PipelineID pipelineId) throws IOException;
 
-  void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
-      throws IOException;
+  void closePipeline(Pipeline pipeline, boolean onTimeout) throws IOException;
 
   void scrubPipeline(ReplicationType type, ReplicationFactor factor)
       throws IOException;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 3732add..85654aa 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
@@ -47,11 +48,11 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 
 /**
  * SCM Pipeline Manager implementation.
@@ -68,6 +69,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   private Scheduler scheduler;
   private BackgroundPipelineCreator backgroundPipelineCreator;
   private final ConfigurationSource conf;
+  private final EventPublisher eventPublisher;
   // Pipeline Manager MXBean
   private ObjectName pmInfoBean;
   private final SCMPipelineMetrics metrics;
@@ -80,11 +82,13 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   private PipelineManagerV2Impl(ConfigurationSource conf,
                                NodeManager nodeManager,
                                StateManager pipelineStateManager,
-                               PipelineFactory pipelineFactory) {
+                               PipelineFactory pipelineFactory,
+                                EventPublisher eventPublisher) {
     this.lock = new ReentrantReadWriteLock();
     this.pipelineFactory = pipelineFactory;
     this.stateManager = pipelineStateManager;
     this.conf = conf;
+    this.eventPublisher = eventPublisher;
     this.pmInfoBean = MBeans.register("SCMPipelineManager",
         "SCMPipelineManagerInfo", this);
     this.metrics = SCMPipelineMetrics.create();
@@ -116,7 +120,7 @@ public final class PipelineManagerV2Impl implements PipelineManager {
         nodeManager, stateManager, conf, eventPublisher);
     // Create PipelineManager
     PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf,
-        nodeManager, stateManager, pipelineFactory);
+        nodeManager, stateManager, pipelineFactory, eventPublisher);
 
     // Create background thread.
     Scheduler scheduler = new Scheduler(
@@ -157,7 +161,14 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   @Override
   public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
                           List<DatanodeDetails> nodes) {
-    return null;
+    // This will mostly be used to create dummy pipeline for SimplePipelines.
+    // We don't update the metrics for SimplePipelines.
+    lock.writeLock().lock();
+    try {
+      return pipelineFactory.create(type, factor, nodes);
+    } finally {
+      lock.writeLock().unlock();
+    }
   }
 
   @Override
@@ -310,97 +321,77 @@ public final class PipelineManagerV2Impl implements PipelineManager {
   }
 
   /**
-   * Finalizes pipeline in the SCM. Removes pipeline and makes rpc call to
-   * destroy pipeline on the datanodes immediately or after timeout based on the
-   * value of onTimeout parameter.
-   *
-   * @param pipeline        - Pipeline to be destroyed
-   * @param onTimeout       - if true pipeline is removed and destroyed on
-   *                        datanodes after timeout
-   * @throws IOException
-   */
-  @Override
-  public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
-      throws IOException {
-    LOG.info("Destroying pipeline:{}", pipeline);
-    finalizePipeline(pipeline.getId());
-    if (onTimeout) {
-      long pipelineDestroyTimeoutInMillis =
-          conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
-              ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
-              TimeUnit.MILLISECONDS);
-      scheduler.schedule(() -> destroyPipeline(pipeline),
-          pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG,
-          String.format("Destroy pipeline failed for pipeline:%s", pipeline));
-    } else {
-      destroyPipeline(pipeline);
-    }
-  }
-
-  /**
-   * Moves the pipeline to CLOSED state and sends close container command for
-   * all the containers in the pipeline.
+   * Removes the pipeline from the db and pipeline state map.
    *
-   * @param pipelineId - ID of the pipeline to be moved to CLOSED state.
+   * @param pipeline - pipeline to be removed
    * @throws IOException
    */
-  private void finalizePipeline(PipelineID pipelineId) throws IOException {
+  protected void removePipeline(Pipeline pipeline) throws IOException {
+    pipelineFactory.close(pipeline.getType(), pipeline);
+    PipelineID pipelineID = pipeline.getId();
     lock.writeLock().lock();
     try {
-      Pipeline pipeline = stateManager.getPipeline(pipelineId);
-      if (!pipeline.isClosed()) {
-        stateManager.updatePipelineState(
-            pipelineId.getProtobuf(), HddsProtos.PipelineState.PIPELINE_CLOSED);
-        LOG.info("Pipeline {} moved to CLOSED state", pipeline);
-      }
-
-      // TODO fire events to datanodes for closing pipelines
-//      Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
-//      for (ContainerID containerID : containerIDs) {
-//        eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
-//      }
-      metrics.removePipelineMetrics(pipelineId);
+      stateManager.removePipeline(pipelineID.getProtobuf());
+      metrics.incNumPipelineDestroyed();
+    } catch (IOException ex) {
+      metrics.incNumPipelineDestroyFailed();
+      throw ex;
     } finally {
       lock.writeLock().unlock();
     }
   }
 
   /**
-   * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
-   * the datanodes for ratis pipelines.
-   *
-   * @param pipeline        - Pipeline to be destroyed
+   * Fire events to close all containers related to the input pipeline.
+   * @param pipelineId - ID of the pipeline.
    * @throws IOException
    */
-  protected void destroyPipeline(Pipeline pipeline) throws IOException {
-    pipelineFactory.close(pipeline.getType(), pipeline);
-    // remove the pipeline from the pipeline manager
-    removePipeline(pipeline.getId());
-    triggerPipelineCreation();
+  protected void closeContainersForPipeline(final PipelineID pipelineId)
+      throws IOException {
+    Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
+    for (ContainerID containerID : containerIDs) {
+      eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
+    }
   }
 
   /**
-   * Removes the pipeline from the db and pipeline state map.
-   *
-   * @param pipelineId - ID of the pipeline to be removed
+   * put pipeline in CLOSED state.
+   * @param pipeline - ID of the pipeline.
+   * @param onTimeout - whether to remove pipeline after some time.
    * @throws IOException
    */
-  protected void removePipeline(PipelineID pipelineId) throws IOException {
+  @Override
+  public void closePipeline(Pipeline pipeline, boolean onTimeout)
+      throws IOException {
+    PipelineID pipelineID = pipeline.getId();
     lock.writeLock().lock();
     try {
-      stateManager.removePipeline(pipelineId.getProtobuf());
-      metrics.incNumPipelineDestroyed();
-    } catch (IOException ex) {
-      metrics.incNumPipelineDestroyFailed();
-      throw ex;
+      if (!pipeline.isClosed()) {
+        stateManager.updatePipelineState(pipelineID.getProtobuf(),
+            HddsProtos.PipelineState.PIPELINE_CLOSED);
+        LOG.info("Pipeline {} moved to CLOSED state", pipeline);
+      }
+      metrics.removePipelineMetrics(pipelineID);
     } finally {
       lock.writeLock().unlock();
     }
+    // close containers.
+    closeContainersForPipeline(pipelineID);
+    if (!onTimeout) {
+      // close pipeline right away.
+      removePipeline(pipeline);
+    }
   }
 
+  /**
+   * Scrub pipelines.
+   * @param type Pipeline type
+   * @param factor Pipeline factor
+   * @throws IOException
+   */
   @Override
   public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
-      throws IOException{
+      throws IOException {
     if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) {
       // Only srub pipeline for RATIS THREE pipeline
       return;
@@ -410,18 +401,29 @@ public final class PipelineManagerV2Impl implements PipelineManager {
         ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
         ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
         TimeUnit.MILLISECONDS);
-    List<Pipeline> needToSrubPipelines = stateManager.getPipelines(type, factor,
-        Pipeline.PipelineState.ALLOCATED).stream()
-        .filter(p -> currentTime.toEpochMilli() - p.getCreationTimestamp()
-            .toEpochMilli() >= pipelineScrubTimeoutInMills)
-        .collect(Collectors.toList());
-    for (Pipeline p : needToSrubPipelines) {
-      LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
-          " since it stays at ALLOCATED stage for " +
-          Duration.between(currentTime, p.getCreationTimestamp()).toMinutes() +
-          " mins.");
-      finalizeAndDestroyPipeline(p, false);
+
+    List<Pipeline> candidates = stateManager.getPipelines(type, factor);
+
+    for (Pipeline p : candidates) {
+      // scrub pipelines who stay ALLOCATED for too long.
+      if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
+          (currentTime.toEpochMilli() - p.getCreationTimestamp()
+              .toEpochMilli() >= pipelineScrubTimeoutInMills)) {
+        LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
+            " since it stays at ALLOCATED stage for " +
+            Duration.between(currentTime, p.getCreationTimestamp())
+                .toMinutes() + " mins.");
+        closePipeline(p, false);
+      }
+      // scrub pipelines who stay CLOSED for too long.
+      if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) {
+        LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
+            " since it stays at CLOSED stage.");
+        closeContainersForPipeline(p.getId());
+        removePipeline(p);
+      }
     }
+    return;
   }
 
   /**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 50754db..365d8ee 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -382,33 +381,53 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   /**
-   * Finalizes pipeline in the SCM. Removes pipeline and makes rpc call to
-   * destroy pipeline on the datanodes immediately or after timeout based on the
-   * value of onTimeout parameter.
-   *
-   * @param pipeline        - Pipeline to be destroyed
-   * @param onTimeout       - if true pipeline is removed and destroyed on
-   *                        datanodes after timeout
+   * Fire events to close all containers related to the input pipeline.
+   * @param pipelineId - ID of the pipeline.
+   * @throws IOException
+   */
+  protected void closeContainersForPipeline(final PipelineID pipelineId)
+      throws IOException {
+    Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
+    for (ContainerID containerID : containerIDs) {
+      eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
+    }
+  }
+
+  /**
+   * put pipeline in CLOSED state.
+   * @param pipeline - ID of the pipeline.
+   * @param onTimeout - whether to remove pipeline after some time.
    * @throws IOException
    */
   @Override
-  public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
+  public void closePipeline(Pipeline pipeline, boolean onTimeout)
       throws IOException {
-    LOG.info("Destroying pipeline:{}", pipeline);
-    finalizePipeline(pipeline.getId());
-    if (onTimeout) {
-      long pipelineDestroyTimeoutInMillis =
-          conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
-              ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
-              TimeUnit.MILLISECONDS);
-      scheduler.schedule(() -> destroyPipeline(pipeline),
-          pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG,
-          String.format("Destroy pipeline failed for pipeline:%s", pipeline));
-    } else {
-      destroyPipeline(pipeline);
+    PipelineID pipelineID = pipeline.getId();
+    lock.writeLock().lock();
+    try {
+      if (!pipeline.isClosed()) {
+        stateManager.updatePipelineState(pipelineID,
+            Pipeline.PipelineState.CLOSED);
+        LOG.info("Pipeline {} moved to CLOSED state", pipeline);
+      }
+      metrics.removePipelineMetrics(pipelineID);
+    } finally {
+      lock.writeLock().unlock();
+    }
+    // close containers.
+    closeContainersForPipeline(pipelineID);
+    if (!onTimeout) {
+      // close pipeline right away.
+      removePipeline(pipeline);
     }
   }
 
+  /**
+   * Scrub pipelines.
+   * @param type Pipeline type
+   * @param factor Pipeline factor
+   * @throws IOException
+   */
   @Override
   public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
       throws IOException{
@@ -421,18 +440,29 @@ public class SCMPipelineManager implements PipelineManager {
         ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
         ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
         TimeUnit.MILLISECONDS);
-    List<Pipeline> needToSrubPipelines = stateManager.getPipelines(type, factor,
-        Pipeline.PipelineState.ALLOCATED).stream()
-        .filter(p -> currentTime.toEpochMilli() - p.getCreationTimestamp()
-            .toEpochMilli() >= pipelineScrubTimeoutInMills)
-        .collect(Collectors.toList());
-    for (Pipeline p : needToSrubPipelines) {
-      LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
-          " since it stays at ALLOCATED stage for " +
-          Duration.between(currentTime, p.getCreationTimestamp()).toMinutes() +
-          " mins.");
-      finalizeAndDestroyPipeline(p, false);
+
+    List<Pipeline> candidates = stateManager.getPipelines(type, factor);
+
+    for (Pipeline p : candidates) {
+      // scrub pipelines who stay ALLOCATED for too long.
+      if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
+          (currentTime.toEpochMilli() - p.getCreationTimestamp()
+              .toEpochMilli() >= pipelineScrubTimeoutInMills)) {
+        LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
+            " since it stays at ALLOCATED stage for " +
+            Duration.between(currentTime, p.getCreationTimestamp())
+                .toMinutes() + " mins.");
+        closePipeline(p, false);
+      }
+      // scrub pipelines who stay CLOSED for too long.
+      if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) {
+        LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
+            " since it is at CLOSED stage.");
+        closeContainersForPipeline(p.getId());
+        removePipeline(p);
+      }
     }
+    return;
   }
 
   @Override
@@ -528,53 +558,20 @@ public class SCMPipelineManager implements PipelineManager {
   }
 
   /**
-   * Moves the pipeline to CLOSED state and sends close container command for
-   * all the containers in the pipeline.
-   *
-   * @param pipelineId - ID of the pipeline to be moved to CLOSED state.
-   * @throws IOException
-   */
-  private void finalizePipeline(PipelineID pipelineId) throws IOException {
-    lock.writeLock().lock();
-    try {
-      stateManager.finalizePipeline(pipelineId);
-      Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
-      for (ContainerID containerID : containerIDs) {
-        eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
-      }
-      metrics.removePipelineMetrics(pipelineId);
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
-   * the datanodes for ratis pipelines.
-   *
-   * @param pipeline        - Pipeline to be destroyed
-   * @throws IOException
-   */
-  protected void destroyPipeline(Pipeline pipeline) throws IOException {
-    pipelineFactory.close(pipeline.getType(), pipeline);
-    // remove the pipeline from the pipeline manager
-    removePipeline(pipeline.getId());
-    triggerPipelineCreation();
-  }
-
-  /**
    * Removes the pipeline from the db and pipeline state map.
    *
-   * @param pipelineId - ID of the pipeline to be removed
+   * @param pipeline - pipeline to be removed
    * @throws IOException
    */
-  protected void removePipeline(PipelineID pipelineId) throws IOException {
+  protected void removePipeline(Pipeline pipeline) throws IOException {
+    pipelineFactory.close(pipeline.getType(), pipeline);
+    PipelineID pipelineID = pipeline.getId();
     lock.writeLock().lock();
     try {
       if (pipelineStore != null) {
-        pipelineStore.delete(pipelineId);
-        Pipeline pipeline = stateManager.removePipeline(pipelineId);
-        nodeManager.removePipeline(pipeline);
+        pipelineStore.delete(pipelineID);
+        Pipeline pipelineRemoved = stateManager.removePipeline(pipelineID);
+        nodeManager.removePipeline(pipelineRemoved);
         metrics.incNumPipelineDestroyed();
       }
     } catch (IOException ex) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index df5c147..ede679d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -475,7 +475,7 @@ public class SCMClientProtocolServer implements
     PipelineManager pipelineManager = scm.getPipelineManager();
     Pipeline pipeline =
         pipelineManager.getPipeline(PipelineID.getFromProtobuf(pipelineID));
-    pipelineManager.finalizeAndDestroyPipeline(pipeline, true);
+    pipelineManager.closePipeline(pipeline, true);
     AUDIT.logWriteSuccess(
         buildAuditMessageForSuccess(SCMAction.CLOSE_PIPELINE, null)
     );
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index c89065c..cf6f0ed 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -417,7 +417,7 @@ public class TestBlockManager {
   public void testBlockAllocationWithNoAvailablePipelines()
       throws IOException, TimeoutException, InterruptedException {
     for (Pipeline pipeline : pipelineManager.getPipelines()) {
-      pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+      pipelineManager.closePipeline(pipeline, false);
     }
     Assert.assertEquals(0, pipelineManager.getPipelines(type, factor).size());
     Assert.assertNotNull(blockManager
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 5dd6082..6292ad4 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -165,8 +165,7 @@ public final class MockPipelineManager implements PipelineManager {
   }
 
   @Override
-  public void finalizeAndDestroyPipeline(final Pipeline pipeline,
-                                         final boolean onTimeout)
+  public void closePipeline(final Pipeline pipeline, final boolean onTimeout)
       throws IOException {
     stateManager.finalizePipeline(pipeline.getId());
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index f8eeb6e..d5292e3 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -185,7 +185,7 @@ public class TestPipelineManagerImpl {
             Pipeline.PipelineState.OPEN).contains(pipeline));
 
     try {
-      pipelineManager.removePipeline(pipeline.getId());
+      pipelineManager.removePipeline(pipeline);
       fail();
     } catch (IOException ioe) {
       // Should not be able to remove the OPEN pipeline.
@@ -195,7 +195,7 @@ public class TestPipelineManagerImpl {
     }
 
     // Destroy pipeline
-    pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+    pipelineManager.closePipeline(pipeline, false);
     try {
       pipelineManager.getPipeline(pipeline.getId());
       fail("Pipeline should not have been retrieved");
@@ -238,7 +238,7 @@ public class TestPipelineManagerImpl {
         .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
 
     // close the pipeline
-    pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+    pipelineManager.closePipeline(pipeline, false);
 
     // pipeline report for destroyed pipeline should be ignored
     nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 7c2f17e..08c1a20 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -140,7 +140,7 @@ public class TestSCMPipelineManager {
 
     // clean up
     for (Pipeline pipeline : pipelines) {
-      pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+      pipelineManager.closePipeline(pipeline, false);
     }
     pipelineManager.close();
   }
@@ -163,7 +163,7 @@ public class TestSCMPipelineManager {
     pipelineManager.openPipeline(pipeline.getId());
     pipelineManager
         .addContainerToPipeline(pipeline.getId(), ContainerID.valueof(1));
-    pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+    pipelineManager.closePipeline(pipeline, false);
     pipelineManager.close();
 
     // new pipeline manager should not be able to load removed pipelines
@@ -228,7 +228,7 @@ public class TestSCMPipelineManager {
         .assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
 
     // close the pipeline
-    pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+    pipelineManager.closePipeline(pipeline, false);
 
     // pipeline report for destroyed pipeline should be ignored
     nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
index 42acb12..532f400 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestNode2PipelineMap.java
@@ -121,7 +121,7 @@ public class TestNode2PipelineMap {
     Assert.assertEquals(0, set2.size());
 
     pipelineManager
-        .finalizeAndDestroyPipeline(ratisContainer.getPipeline(), false);
+        .closePipeline(ratisContainer.getPipeline(), false);
     pipelines = scm.getScmNodeManager()
         .getPipelines(dns.get(0));
     Assert
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index 346fc0e..0ee0101 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -137,7 +137,7 @@ public class TestPipelineClose {
     Assert.assertEquals(0, setClosed.size());
 
     pipelineManager
-        .finalizeAndDestroyPipeline(ratisContainer.getPipeline(), false);
+        .closePipeline(ratisContainer.getPipeline(), false);
     for (DatanodeDetails dn : ratisContainer.getPipeline().getNodes()) {
       // Assert that the pipeline has been removed from Node2PipelineMap as well
       Assert.assertFalse(scm.getScmNodeManager().getPipelines(dn)
@@ -153,7 +153,7 @@ public class TestPipelineClose {
     Assert.assertEquals(1, setOpen.size());
 
     pipelineManager
-        .finalizeAndDestroyPipeline(ratisContainer.getPipeline(), false);
+        .closePipeline(ratisContainer.getPipeline(), false);
     GenericTestUtils.waitFor(() -> {
       try {
         return containerManager
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
index bd677db..08a29f2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
@@ -86,7 +86,7 @@ public class TestRatisPipelineCreateAndDestroy {
         .getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN);
     for (Pipeline pipeline : pipelines) {
-      pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+      pipelineManager.closePipeline(pipeline, false);
     }
     // make sure two pipelines are created
     waitForPipelines(2);
@@ -108,7 +108,7 @@ public class TestRatisPipelineCreateAndDestroy {
         .getPipelines(HddsProtos.ReplicationType.RATIS,
             HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN);
     for (Pipeline pipeline : pipelines) {
-      pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+      pipelineManager.closePipeline(pipeline, false);
     }
 
     // make sure two pipelines are created
@@ -152,7 +152,7 @@ public class TestRatisPipelineCreateAndDestroy {
 
     // destroy the existing pipelines
     for (Pipeline pipeline : pipelines) {
-      pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+      pipelineManager.closePipeline(pipeline, false);
     }
 
     if (cluster.getStorageContainerManager()
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
index 12ffce6..fab2ea3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestHelper.java
@@ -206,8 +206,8 @@ public final class TestHelper {
       throws TimeoutException, InterruptedException, IOException {
     for (Pipeline pipeline1 : pipelineList) {
       // issue pipeline destroy command
-      cluster.getStorageContainerManager().getPipelineManager()
-          .finalizeAndDestroyPipeline(pipeline1, false);
+      cluster.getStorageContainerManager()
+          .getPipelineManager().closePipeline(pipeline1, false);
     }
 
     // wait for the pipeline to get destroyed in the datanodes
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index bb2d57f..6b40179 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -204,7 +204,7 @@ public class TestCloseContainerByPipeline {
     Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails));
 
     cluster.getStorageContainerManager().getPipelineManager()
-        .finalizeAndDestroyPipeline(pipeline, false);
+        .closePipeline(pipeline, false);
     Thread.sleep(5000);
     // Pipeline close should not affect a container in CLOSED state
     Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails));
@@ -307,7 +307,7 @@ public class TestCloseContainerByPipeline {
 
     // close the pipeline
     cluster.getStorageContainerManager()
-        .getPipelineManager().finalizeAndDestroyPipeline(pipeline, false);
+        .getPipelineManager().closePipeline(pipeline, false);
 
     // All the containers in OPEN or CLOSING state should transition to
     // QUASI-CLOSED after pipeline close
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java
index fa25361..feb03ec 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithDatanodeRestart.java
@@ -85,6 +85,7 @@ public class TestFreonWithDatanodeRestart {
             RatisHelper.HDDS_DATANODE_RATIS_CLIENT_PREFIX_KEY+ "." +
                     "watch.request.timeout",
             3, TimeUnit.SECONDS);
+    conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 5);
     cluster = MiniOzoneCluster.newBuilder(conf)
       .setHbProcessorInterval(1000)
       .setHbInterval(1000)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
index 5150fd4..109a49e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
@@ -105,6 +105,6 @@ public class TestFreonWithPipelineDestroy {
     PipelineManager pipelineManager =
         cluster.getStorageContainerManager().getPipelineManager();
     Pipeline pipeline = pipelineManager.getPipeline(id);
-    pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
+    pipelineManager.closePipeline(pipeline, false);
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java
index 6e3dfe3..9092cc5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/recon/TestReconAsPassiveScm.java
@@ -181,7 +181,7 @@ public class TestReconAsPassiveScm {
         .filter(p -> !p.getId().equals(containerInfo.getPipelineID()))
         .findFirst();
     assertTrue(pipelineToClose.isPresent());
-    scmPipelineManager.finalizeAndDestroyPipeline(pipelineToClose.get(), false);
+    scmPipelineManager.closePipeline(pipelineToClose.get(), false);
 
     // Start Recon
     cluster.startRecon();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
index 4025aca..5edd392 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
@@ -124,7 +124,7 @@ public class TestSCMContainerPlacementPolicyMetrics {
         .collect(Collectors.toList());
     Pipeline targetPipeline = pipelines.get(0);
     List<DatanodeDetails> nodes = targetPipeline.getNodes();
-    manager.finalizeAndDestroyPipeline(pipelines.get(0), true);
+    manager.closePipeline(pipelines.get(0), true);
 
     // kill datanode to trigger under-replicated container replication
     cluster.shutdownHddsDatanode(nodes.get(0));
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java
index 250a2b0..a1a816d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/pipeline/TestSCMPipelineMetrics.java
@@ -92,8 +92,7 @@ public class TestSCMPipelineMetrics {
     try {
       cluster.getStorageContainerManager()
           .getPipelineManager()
-          .finalizeAndDestroyPipeline(
-              pipeline.get(), false);
+          .closePipeline(pipeline.get(), false);
     } catch (IOException e) {
       e.printStackTrace();
       Assert.fail();
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
index beed591..a96212d 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
@@ -62,13 +62,6 @@ public class ReconPipelineManager extends SCMPipelineManager {
     // Don't do anything in Recon.
   }
 
-  @Override
-  protected void destroyPipeline(Pipeline pipeline) throws IOException {
-    // remove the pipeline from the pipeline manager
-    removePipeline(pipeline.getId());
-  }
-
-
   /**
    * Bootstrap Recon's pipeline metadata with that from SCM.
    * @param pipelinesFromScm pipelines from SCM.
@@ -124,7 +117,7 @@ public class ReconPipelineManager extends SCMPipelineManager {
         }
         try {
           LOG.info("Removing invalid pipeline {} from Recon.", pipelineID);
-          finalizeAndDestroyPipeline(p, false);
+          closePipeline(p, false);
         } catch (IOException e) {
           LOG.warn("Unable to remove pipeline {}", pipelineID, e);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org