You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by li...@apache.org on 2020/06/10 08:26:38 UTC

[hadoop-ozone] 07/09: HDDS-3196 New PipelineManager interface to persist to RatisServer. (#980)

This is an automated email from the ASF dual-hosted git repository.

licheng pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit c8367208276b4137261992f3ff395f22e7db34b0
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Tue Jun 2 02:18:37 2020 +0800

    HDDS-3196 New PipelineManager interface to persist to RatisServer. (#980)
---
 .../hdds/scm/pipeline/PipelineManagerV2Impl.java   | 617 +++++++++++++++++++++
 .../hdds/scm/pipeline/PipelineStateManagerV2.java  |  99 ++++
 .../scm/pipeline/PipelineStateManagerV2Impl.java   | 226 ++++++++
 .../hdds/scm/pipeline/RatisPipelineUtils.java      |  19 +
 4 files changed, 961 insertions(+)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
new file mode 100644
index 0000000..a6a3249
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -0,0 +1,617 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.utils.Scheduler;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * SCM Pipeline Manager implementation.
+ * All the write operations for pipelines must come via PipelineManager.
+ * It synchronises all write and read operations via a ReadWriteLock.
+ */
+public class PipelineManagerV2Impl implements PipelineManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMPipelineManager.class);
+
+  private final ReadWriteLock lock;
+  private PipelineFactory pipelineFactory;
+  private PipelineStateManagerV2 stateManager;
+  private Scheduler scheduler;
+  private BackgroundPipelineCreator backgroundPipelineCreator;
+  private final ConfigurationSource conf;
+  // Pipeline Manager MXBean
+  private ObjectName pmInfoBean;
+  private final SCMPipelineMetrics metrics;
+  private long pipelineWaitDefaultTimeout;
+  private final AtomicBoolean isInSafeMode;
+  // Used to track if the safemode pre-checks have completed. This is designed
+  // to prevent pipelines being created until sufficient nodes have registered.
+  private final AtomicBoolean pipelineCreationAllowed;
+
+  public PipelineManagerV2Impl(ConfigurationSource conf,
+                               NodeManager nodeManager,
+                               PipelineStateManagerV2 pipelineStateManager,
+                               PipelineFactory pipelineFactory) {
+    this.lock = new ReentrantReadWriteLock();
+    this.pipelineFactory = pipelineFactory;
+    this.stateManager = pipelineStateManager;
+    this.conf = conf;
+    this.pmInfoBean = MBeans.register("SCMPipelineManager",
+        "SCMPipelineManagerInfo", this);
+    this.metrics = SCMPipelineMetrics.create();
+    this.pipelineWaitDefaultTimeout = conf.getTimeDuration(
+        HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
+        HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    this.isInSafeMode = new AtomicBoolean(conf.getBoolean(
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED,
+        HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT));
+    // Pipeline creation is only allowed after the safemode prechecks have
+    // passed, eg sufficient nodes have registered.
+    this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get());
+  }
+
+  public static PipelineManager newPipelineManager(
+      ConfigurationSource conf, SCMHAManager scmhaManager,
+      NodeManager nodeManager, Table<PipelineID, Pipeline> pipelineStore,
+      PipelineFactory pipelineFactory) throws IOException {
+    // Create PipelineStateManager
+    PipelineStateManagerV2 stateManager = PipelineStateManagerV2Impl
+        .newBuilder().setPipelineStore(pipelineStore)
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setNodeManager(nodeManager)
+        .build();
+
+    // Create PipelineManager
+    PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf,
+        nodeManager, stateManager, pipelineFactory);
+
+    // Create background thread.
+    Scheduler scheduler = new Scheduler(
+        "RatisPipelineUtilsThread", false, 1);
+    BackgroundPipelineCreator backgroundPipelineCreator =
+        new BackgroundPipelineCreator(pipelineManager, scheduler, conf);
+    pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
+    pipelineManager.setScheduler(scheduler);
+
+    return pipelineManager;
+  }
+
+  @Override
+  public Pipeline createPipeline(ReplicationType type,
+                                 ReplicationFactor factor) throws IOException {
+    if (!isPipelineCreationAllowed() && factor != ReplicationFactor.ONE) {
+      LOG.debug("Pipeline creation is not allowed until safe mode prechecks " +
+          "complete");
+      throw new IOException("Pipeline creation is not allowed as safe mode " +
+          "prechecks have not yet passed");
+    }
+    lock.writeLock().lock();
+    try {
+      Pipeline pipeline = pipelineFactory.create(type, factor);
+      stateManager.addPipeline(pipeline.getProtobufMessage());
+      recordMetricsForPipeline(pipeline);
+      return pipeline;
+    } catch (IOException ex) {
+      LOG.error("Failed to create pipeline of type {} and factor {}. " +
+          "Exception: {}", type, factor, ex.getMessage());
+      metrics.incNumPipelineCreationFailed();
+      throw ex;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
+                          List<DatanodeDetails> nodes) {
+    return null;
+  }
+
+  @Override
+  public Pipeline getPipeline(PipelineID pipelineID)
+      throws PipelineNotFoundException {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipeline(pipelineID);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public boolean containsPipeline(PipelineID pipelineID) {
+    lock.readLock().lock();
+    try {
+      getPipeline(pipelineID);
+      return true;
+    } catch (PipelineNotFoundException e) {
+      return false;
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public List<Pipeline> getPipelines() {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipelines();
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public List<Pipeline> getPipelines(ReplicationType type) {
+    try {
+      return stateManager.getPipelines(type);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public List<Pipeline> getPipelines(ReplicationType type,
+                              ReplicationFactor factor) {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipelines(type, factor);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public List<Pipeline> getPipelines(ReplicationType type,
+                              Pipeline.PipelineState state) {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipelines(type, state);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public List<Pipeline> getPipelines(ReplicationType type,
+                              ReplicationFactor factor,
+                              Pipeline.PipelineState state) {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipelines(type, factor, state);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public List<Pipeline> getPipelines(
+      ReplicationType type, ReplicationFactor factor,
+      Pipeline.PipelineState state, Collection<DatanodeDetails> excludeDns,
+      Collection<PipelineID> excludePipelines) {
+    lock.readLock().lock();
+    try {
+      return stateManager
+          .getPipelines(type, factor, state, excludeDns, excludePipelines);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void addContainerToPipeline(
+      PipelineID pipelineID, ContainerID containerID) throws IOException {
+    lock.writeLock().lock();
+    try {
+      stateManager.addContainerToPipeline(pipelineID, containerID);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void removeContainerFromPipeline(
+      PipelineID pipelineID, ContainerID containerID) throws IOException {
+    lock.writeLock().lock();
+    try {
+      stateManager.removeContainerFromPipeline(pipelineID, containerID);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public NavigableSet<ContainerID> getContainersInPipeline(
+      PipelineID pipelineID) throws IOException {
+    lock.readLock().lock();
+    try {
+      return stateManager.getContainers(pipelineID);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
+  public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
+    return stateManager.getNumberOfContainers(pipelineID);
+  }
+
+  @Override
+  public void openPipeline(PipelineID pipelineId) throws IOException {
+    lock.writeLock().lock();
+    try {
+      Pipeline pipeline = stateManager.getPipeline(pipelineId);
+      if (pipeline.isClosed()) {
+        throw new IOException("Closed pipeline can not be opened");
+      }
+      if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
+        LOG.info("Pipeline {} moved to OPEN state", pipeline);
+        stateManager.updatePipelineState(pipelineId.getProtobuf(),
+            HddsProtos.PipelineState.PIPELINE_OPEN);
+      }
+      metrics.incNumPipelineCreated();
+      metrics.createPerPipelineMetrics(pipeline);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Finalizes pipeline in the SCM. Removes pipeline and makes rpc call to
+   * destroy pipeline on the datanodes immediately or after timeout based on the
+   * value of onTimeout parameter.
+   *
+   * @param pipeline        - Pipeline to be destroyed
+   * @param onTimeout       - if true pipeline is removed and destroyed on
+   *                        datanodes after timeout
+   * @throws IOException
+   */
+  @Override
+  public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
+      throws IOException {
+    LOG.info("Destroying pipeline:{}", pipeline);
+    finalizePipeline(pipeline.getId());
+    if (onTimeout) {
+      long pipelineDestroyTimeoutInMillis =
+          conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
+              ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
+              TimeUnit.MILLISECONDS);
+      scheduler.schedule(() -> destroyPipeline(pipeline),
+          pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG,
+          String.format("Destroy pipeline failed for pipeline:%s", pipeline));
+    } else {
+      destroyPipeline(pipeline);
+    }
+  }
+
+  /**
+   * Moves the pipeline to CLOSED state and sends close container command for
+   * all the containers in the pipeline.
+   *
+   * @param pipelineId - ID of the pipeline to be moved to CLOSED state.
+   * @throws IOException
+   */
+  private void finalizePipeline(PipelineID pipelineId) throws IOException {
+    lock.writeLock().lock();
+    try {
+      Pipeline pipeline = stateManager.getPipeline(pipelineId);
+      if (!pipeline.isClosed()) {
+        stateManager.updatePipelineState(pipelineId.getProtobuf(),
+            HddsProtos.PipelineState.PIPELINE_CLOSED);
+        LOG.info("Pipeline {} moved to CLOSED state", pipeline);
+      }
+
+      // TODO fire events to datanodes for closing pipelines
+//      Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
+//      for (ContainerID containerID : containerIDs) {
+//        eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
+//      }
+      metrics.removePipelineMetrics(pipelineId);
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  /**
+   * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
+   * the datanodes for ratis pipelines.
+   *
+   * @param pipeline        - Pipeline to be destroyed
+   * @throws IOException
+   */
+  protected void destroyPipeline(Pipeline pipeline) throws IOException {
+    pipelineFactory.close(pipeline.getType(), pipeline);
+    // remove the pipeline from the pipeline manager
+    removePipeline(pipeline.getId());
+    triggerPipelineCreation();
+  }
+
+  /**
+   * Removes the pipeline from the db and pipeline state map.
+   *
+   * @param pipelineId - ID of the pipeline to be removed
+   * @throws IOException
+   */
+  protected void removePipeline(PipelineID pipelineId) throws IOException {
+    lock.writeLock().lock();
+    try {
+      stateManager.removePipeline(pipelineId.getProtobuf());
+      metrics.incNumPipelineDestroyed();
+    } catch (IOException ex) {
+      metrics.incNumPipelineDestroyFailed();
+      throw ex;
+    } finally {
+      lock.writeLock().unlock();
+    }
+  }
+
+  @Override
+  public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
+      throws IOException{
+    if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) {
+      // Only srub pipeline for RATIS THREE pipeline
+      return;
+    }
+    Instant currentTime = Instant.now();
+    Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
+        ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
+        ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
+        TimeUnit.MILLISECONDS);
+    List<Pipeline> needToSrubPipelines = stateManager.getPipelines(type, factor,
+        Pipeline.PipelineState.ALLOCATED).stream()
+        .filter(p -> currentTime.toEpochMilli() - p.getCreationTimestamp()
+            .toEpochMilli() >= pipelineScrubTimeoutInMills)
+        .collect(Collectors.toList());
+    for (Pipeline p : needToSrubPipelines) {
+      LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
+          " since it stays at ALLOCATED stage for " +
+          Duration.between(currentTime, p.getCreationTimestamp()).toMinutes() +
+          " mins.");
+      finalizeAndDestroyPipeline(p, false);
+    }
+  }
+
+  /**
+   * Schedules a fixed interval job to create pipelines.
+   */
+  @Override
+  public void startPipelineCreator() {
+    backgroundPipelineCreator.startFixedIntervalPipelineCreator();
+  }
+
+  /**
+   * Triggers pipeline creation after the specified time.
+   */
+  @Override
+  public void triggerPipelineCreation() {
+    backgroundPipelineCreator.triggerPipelineCreation();
+  }
+
+  @Override
+  public void incNumBlocksAllocatedMetric(PipelineID id) {
+    metrics.incNumBlocksAllocated(id);
+  }
+
+  /**
+   * Activates a dormant pipeline.
+   *
+   * @param pipelineID ID of the pipeline to activate.
+   * @throws IOException in case of any Exception
+   */
+  @Override
+  public void activatePipeline(PipelineID pipelineID)
+      throws IOException {
+    stateManager.updatePipelineState(pipelineID.getProtobuf(),
+        HddsProtos.PipelineState.PIPELINE_OPEN);
+  }
+
+  /**
+   * Deactivates an active pipeline.
+   *
+   * @param pipelineID ID of the pipeline to deactivate.
+   * @throws IOException in case of any Exception
+   */
+  @Override
+  public void deactivatePipeline(PipelineID pipelineID)
+      throws IOException {
+    stateManager.updatePipelineState(pipelineID.getProtobuf(),
+        HddsProtos.PipelineState.PIPELINE_DORMANT);
+  }
+
+  /**
+   * Wait a pipeline to be OPEN.
+   *
+   * @param pipelineID ID of the pipeline to wait for.
+   * @param timeout    wait timeout, millisecond, 0 to use default value
+   * @throws IOException in case of any Exception, such as timeout
+   */
+  @Override
+  public void waitPipelineReady(PipelineID pipelineID, long timeout)
+      throws IOException {
+    long st = Time.monotonicNow();
+    if (timeout == 0) {
+      timeout = pipelineWaitDefaultTimeout;
+    }
+
+    boolean ready;
+    Pipeline pipeline;
+    do {
+      try {
+        pipeline = stateManager.getPipeline(pipelineID);
+      } catch (PipelineNotFoundException e) {
+        throw new PipelineNotFoundException(String.format(
+            "Pipeline %s cannot be found", pipelineID));
+      }
+      ready = pipeline.isOpen();
+      if (!ready) {
+        try {
+          Thread.sleep((long)100);
+        } catch (InterruptedException e) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    } while (!ready && Time.monotonicNow() - st < timeout);
+
+    if (!ready) {
+      throw new IOException(String.format("Pipeline %s is not ready in %d ms",
+          pipelineID, timeout));
+    }
+  }
+
+  @Override
+  public Map<String, Integer> getPipelineInfo() {
+    final Map<String, Integer> pipelineInfo = new HashMap<>();
+    for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
+      pipelineInfo.put(state.toString(), 0);
+    }
+    stateManager.getPipelines().forEach(pipeline ->
+        pipelineInfo.computeIfPresent(
+            pipeline.getPipelineState().toString(), (k, v) -> v + 1));
+    return pipelineInfo;
+  }
+
+  /**
+   * Get SafeMode status.
+   * @return boolean
+   */
+  @Override
+  public boolean getSafeModeStatus() {
+    return this.isInSafeMode.get();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (scheduler != null) {
+      scheduler.close();
+      scheduler = null;
+    }
+
+    if(pmInfoBean != null) {
+      MBeans.unregister(this.pmInfoBean);
+      pmInfoBean = null;
+    }
+
+    SCMPipelineMetrics.unRegister();
+
+    // shutdown pipeline provider.
+    pipelineFactory.shutdown();
+  }
+
+  @Override
+  public void onMessage(SCMSafeModeManager.SafeModeStatus status,
+                        EventPublisher publisher) {
+    // TODO: #CLUTIL - handle safemode getting re-enabled
+    boolean currentAllowPipelines =
+        pipelineCreationAllowed.getAndSet(status.isPreCheckComplete());
+    boolean currentlyInSafeMode =
+        isInSafeMode.getAndSet(status.isInSafeMode());
+
+    // Trigger pipeline creation only if the preCheck status has changed to
+    // complete.
+    if (isPipelineCreationAllowed() && !currentAllowPipelines) {
+      triggerPipelineCreation();
+    }
+    // Start the pipeline creation thread only when safemode switches off
+    if (!getSafeModeStatus() && currentlyInSafeMode) {
+      startPipelineCreator();
+    }
+  }
+
+  @VisibleForTesting
+  public boolean isPipelineCreationAllowed() {
+    return pipelineCreationAllowed.get();
+  }
+
+  private void setBackgroundPipelineCreator(
+      BackgroundPipelineCreator backgroundPipelineCreator) {
+    this.backgroundPipelineCreator = backgroundPipelineCreator;
+  }
+
+  private void setScheduler(Scheduler scheduler) {
+    this.scheduler = scheduler;
+  }
+
+  private void recordMetricsForPipeline(Pipeline pipeline) {
+    metrics.incNumPipelineAllocated();
+    if (pipeline.isOpen()) {
+      metrics.incNumPipelineCreated();
+      metrics.createPerPipelineMetrics(pipeline);
+    }
+    switch (pipeline.getType()) {
+    case STAND_ALONE:
+      return;
+    case RATIS:
+      List<Pipeline> overlapPipelines = RatisPipelineUtils
+          .checkPipelineContainSameDatanodes(stateManager, pipeline);
+      if (!overlapPipelines.isEmpty()) {
+        // Count 1 overlap at a time.
+        metrics.incNumPipelineContainSameDatanodes();
+        //TODO remove until pipeline allocation is proved equally distributed.
+        for (Pipeline overlapPipeline : overlapPipelines) {
+          LOG.info("Pipeline: " + pipeline.getId().toString() +
+              " contains same datanodes as previous pipelines: " +
+              overlapPipeline.getId().toString() + " nodeIds: " +
+              pipeline.getNodes().get(0).getUuid().toString() +
+              ", " + pipeline.getNodes().get(1).getUuid().toString() +
+              ", " + pipeline.getNodes().get(2).getUuid().toString());
+        }
+      }
+      return;
+    case CHAINED:
+      // Not supported.
+    default:
+      // Not supported.
+      return;
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java
new file mode 100644
index 0000000..4021575
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NavigableSet;
+
+/**
+ * Manages the state of pipelines in SCM.
+ */
+public interface PipelineStateManagerV2 {
+
+  /**
+   * Adding pipeline would be replicated to Ratis.
+   * @param pipelineProto
+   * @throws IOException
+   */
+  @Replicate
+  void addPipeline(HddsProtos.Pipeline pipelineProto) throws IOException;
+
+  /**
+   * Removing pipeline would be replicated to Ratis.
+   * @param pipelineIDProto
+   * @return Pipeline removed
+   * @throws IOException
+   */
+  @Replicate
+  void removePipeline(HddsProtos.PipelineID pipelineIDProto)
+      throws IOException;
+
+  /**
+   * Updating pipeline state would be replicated to Ratis.
+   * @param pipelineIDProto
+   * @param newState
+   * @throws IOException
+   */
+  @Replicate
+  void updatePipelineState(HddsProtos.PipelineID pipelineIDProto,
+                           HddsProtos.PipelineState newState)
+      throws IOException;
+
+  void addContainerToPipeline(PipelineID pipelineID,
+                              ContainerID containerID) throws IOException;
+
+  Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException;
+
+  List<Pipeline> getPipelines();
+
+  List<Pipeline> getPipelines(HddsProtos.ReplicationType type);
+
+  List<Pipeline> getPipelines(HddsProtos.ReplicationType type,
+                              HddsProtos.ReplicationFactor factor);
+
+  List<Pipeline> getPipelines(HddsProtos.ReplicationType type,
+                              HddsProtos.ReplicationFactor factor,
+                              Pipeline.PipelineState state);
+
+  List<Pipeline> getPipelines(HddsProtos.ReplicationType type,
+                              HddsProtos.ReplicationFactor factor,
+                              Pipeline.PipelineState state,
+                              Collection<DatanodeDetails> excludeDns,
+                              Collection<PipelineID> excludePipelines);
+
+  List<Pipeline> getPipelines(HddsProtos.ReplicationType type,
+                              Pipeline.PipelineState... states);
+
+  NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
+      throws IOException;
+
+  int getNumberOfContainers(PipelineID pipelineID) throws IOException;
+
+
+  void removeContainerFromPipeline(PipelineID pipelineID,
+                                   ContainerID containerID) throws IOException;
+
+  void close() throws Exception;
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
new file mode 100644
index 0000000..c74dc86
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Proxy;
+import java.util.Collection;
+import java.util.List;
+import java.util.NavigableSet;
+
+/**
+ * Implementation of pipeline state manager.
+ * PipelineStateMap class holds the data structures related to pipeline and its
+ * state. All the read and write operations in PipelineStateMap are protected
+ * by a read write lock.
+ */
+public class PipelineStateManagerV2Impl implements PipelineStateManagerV2 {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(PipelineStateManager.class);
+
+  private final PipelineStateMap pipelineStateMap;
+  private final NodeManager nodeManager;
+  private Table<PipelineID, Pipeline> pipelineStore;
+
+  public PipelineStateManagerV2Impl(
+      Table<PipelineID, Pipeline> pipelineStore, NodeManager nodeManager)
+      throws IOException {
+    this.pipelineStateMap = new PipelineStateMap();
+    this.nodeManager = nodeManager;
+    this.pipelineStore = pipelineStore;
+    initialize();
+  }
+
+  private void initialize() throws IOException {
+    if (pipelineStore == null || nodeManager == null) {
+      throw new IOException("PipelineStore cannot be null");
+    }
+    if (pipelineStore.isEmpty()) {
+      LOG.info("No pipeline exists in current db");
+      return;
+    }
+    TableIterator<PipelineID, ? extends Table.KeyValue<PipelineID, Pipeline>>
+        iterator = pipelineStore.iterator();
+    while (iterator.hasNext()) {
+      Pipeline pipeline = iterator.next().getValue();
+      addPipeline(pipeline.getProtobufMessage());
+    }
+  }
+
+  @Override
+  public void addPipeline(HddsProtos.Pipeline pipelineProto)
+      throws IOException {
+    Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto);
+    pipelineStore.put(pipeline.getId(), pipeline);
+    pipelineStateMap.addPipeline(pipeline);
+    nodeManager.addPipeline(pipeline);
+    LOG.info("Created pipeline {}.", pipeline);
+  }
+
+  @Override
+  public void addContainerToPipeline(
+      PipelineID pipelineId, ContainerID containerID)
+      throws IOException {
+    pipelineStateMap.addContainerToPipeline(pipelineId, containerID);
+  }
+
+  @Override
+  public Pipeline getPipeline(PipelineID pipelineID)
+      throws PipelineNotFoundException {
+    return pipelineStateMap.getPipeline(pipelineID);
+  }
+
+  @Override
+  public List<Pipeline> getPipelines() {
+    return pipelineStateMap.getPipelines();
+  }
+
+  @Override
+  public List<Pipeline> getPipelines(HddsProtos.ReplicationType type) {
+    return pipelineStateMap.getPipelines(type);
+  }
+
+  @Override
+  public List<Pipeline> getPipelines(
+      HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor) {
+    return pipelineStateMap.getPipelines(type, factor);
+  }
+
+  @Override
+  public List<Pipeline> getPipelines(
+      HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
+                              Pipeline.PipelineState state) {
+    return pipelineStateMap.getPipelines(type, factor, state);
+  }
+
+  @Override
+  public List<Pipeline> getPipelines(
+      HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
+      Pipeline.PipelineState state, Collection<DatanodeDetails> excludeDns,
+      Collection<PipelineID> excludePipelines) {
+    return pipelineStateMap
+        .getPipelines(type, factor, state, excludeDns, excludePipelines);
+  }
+
+  @Override
+  public List<Pipeline> getPipelines(HddsProtos.ReplicationType type,
+                                     Pipeline.PipelineState... states) {
+    return pipelineStateMap.getPipelines(type, states);
+  }
+
+  @Override
+  public NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
+      throws IOException {
+    return pipelineStateMap.getContainers(pipelineID);
+  }
+
+  @Override
+  public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
+    return pipelineStateMap.getNumberOfContainers(pipelineID);
+  }
+
+  @Override
+  public void removePipeline(HddsProtos.PipelineID pipelineIDProto)
+      throws IOException {
+    PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto);
+    pipelineStore.delete(pipelineID);
+    Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID);
+    nodeManager.removePipeline(pipeline);
+    LOG.info("Pipeline {} removed.", pipeline);
+    return;
+  }
+
+
+  @Override
+  public void removeContainerFromPipeline(
+      PipelineID pipelineID, ContainerID containerID) throws IOException {
+    pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
+  }
+
+  @Override
+  public void updatePipelineState(
+      HddsProtos.PipelineID pipelineIDProto, HddsProtos.PipelineState newState)
+      throws IOException {
+    pipelineStateMap.updatePipelineState(
+        PipelineID.getFromProtobuf(pipelineIDProto),
+        Pipeline.PipelineState.fromProtobuf(newState));
+  }
+
+  @Override
+  public void close() throws Exception {
+    pipelineStore.close();
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * Builder for PipelineStateManager.
+   */
+  public static class Builder {
+    private Table<PipelineID, Pipeline> pipelineStore;
+    private NodeManager nodeManager;
+    private SCMRatisServer scmRatisServer;
+
+    public Builder setRatisServer(final SCMRatisServer ratisServer) {
+      scmRatisServer = ratisServer;
+      return this;
+    }
+
+    public Builder setNodeManager(final NodeManager scmNodeManager) {
+      nodeManager = scmNodeManager;
+      return this;
+    }
+
+    public Builder setPipelineStore(
+        final Table<PipelineID, Pipeline> pipelineTable) {
+      this.pipelineStore = pipelineTable;
+      return this;
+    }
+
+    public PipelineStateManagerV2 build() throws IOException {
+      Preconditions.checkNotNull(pipelineStore);
+
+      final PipelineStateManagerV2 pipelineStateManager =
+          new PipelineStateManagerV2Impl(pipelineStore, nodeManager);
+
+      final SCMHAInvocationHandler invocationHandler =
+          new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.PIPELINE,
+              pipelineStateManager, scmRatisServer);
+
+      return (PipelineStateManagerV2) Proxy.newProxyInstance(
+          SCMHAInvocationHandler.class.getClassLoader(),
+          new Class<?>[]{PipelineStateManagerV2.class}, invocationHandler);
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index 5c9b202..edc40af 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -117,4 +117,23 @@ public final class RatisPipelineUtils {
                 p.sameDatanodes(pipeline)))
         .collect(Collectors.toList());
   }
+
+  /**
+   * Return the list of pipelines who share the same set of datanodes
+   * with the input pipeline.
+   *
+   * @param stateManager PipelineStateManager
+   * @param pipeline input pipeline
+   * @return list of matched pipeline
+   */
+  static List<Pipeline> checkPipelineContainSameDatanodes(
+      PipelineStateManagerV2 stateManager, Pipeline pipeline) {
+    return stateManager.getPipelines(
+        HddsProtos.ReplicationType.RATIS,
+        HddsProtos.ReplicationFactor.THREE)
+        .stream().filter(p -> !p.getId().equals(pipeline.getId()) &&
+            (p.getPipelineState() != Pipeline.PipelineState.CLOSED &&
+                p.sameDatanodes(pipeline)))
+        .collect(Collectors.toList());
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org