You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by na...@apache.org on 2020/06/01 18:18:48 UTC
[hadoop-ozone] branch HDDS-2823 updated: HDDS-3196 New
PipelineManager interface to persist to RatisServer. (#980)
This is an automated email from the ASF dual-hosted git repository.
nanda pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new 723d23c HDDS-3196 New PipelineManager interface to persist to RatisServer. (#980)
723d23c is described below
commit 723d23c0a332a7bd8f39b29ba7a558cea199d93b
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Tue Jun 2 02:18:37 2020 +0800
HDDS-3196 New PipelineManager interface to persist to RatisServer. (#980)
---
.../hdds/scm/pipeline/PipelineManagerV2Impl.java | 617 +++++++++++++++++++++
.../hdds/scm/pipeline/PipelineStateManagerV2.java | 99 ++++
.../scm/pipeline/PipelineStateManagerV2Impl.java | 226 ++++++++
.../hdds/scm/pipeline/RatisPipelineUtils.java | 19 +
4 files changed, 961 insertions(+)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
new file mode 100644
index 0000000..a6a3249
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -0,0 +1,617 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.utils.Scheduler;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+/**
+ * SCM Pipeline Manager implementation.
+ * All the write operations for pipelines must come via PipelineManager.
+ * It synchronises all write and read operations via a ReadWriteLock.
+ */
+public class PipelineManagerV2Impl implements PipelineManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMPipelineManager.class);
+
+ private final ReadWriteLock lock;
+ private PipelineFactory pipelineFactory;
+ private PipelineStateManagerV2 stateManager;
+ private Scheduler scheduler;
+ private BackgroundPipelineCreator backgroundPipelineCreator;
+ private final ConfigurationSource conf;
+ // Pipeline Manager MXBean
+ private ObjectName pmInfoBean;
+ private final SCMPipelineMetrics metrics;
+ private long pipelineWaitDefaultTimeout;
+ private final AtomicBoolean isInSafeMode;
+ // Used to track if the safemode pre-checks have completed. This is designed
+ // to prevent pipelines being created until sufficient nodes have registered.
+ private final AtomicBoolean pipelineCreationAllowed;
+
+ public PipelineManagerV2Impl(ConfigurationSource conf,
+ NodeManager nodeManager,
+ PipelineStateManagerV2 pipelineStateManager,
+ PipelineFactory pipelineFactory) {
+ this.lock = new ReentrantReadWriteLock();
+ this.pipelineFactory = pipelineFactory;
+ this.stateManager = pipelineStateManager;
+ this.conf = conf;
+ this.pmInfoBean = MBeans.register("SCMPipelineManager",
+ "SCMPipelineManagerInfo", this);
+ this.metrics = SCMPipelineMetrics.create();
+ this.pipelineWaitDefaultTimeout = conf.getTimeDuration(
+ HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
+ HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ this.isInSafeMode = new AtomicBoolean(conf.getBoolean(
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED,
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT));
+ // Pipeline creation is only allowed after the safemode prechecks have
+ // passed, eg sufficient nodes have registered.
+ this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get());
+ }
+
+ public static PipelineManager newPipelineManager(
+ ConfigurationSource conf, SCMHAManager scmhaManager,
+ NodeManager nodeManager, Table<PipelineID, Pipeline> pipelineStore,
+ PipelineFactory pipelineFactory) throws IOException {
+ // Create PipelineStateManager
+ PipelineStateManagerV2 stateManager = PipelineStateManagerV2Impl
+ .newBuilder().setPipelineStore(pipelineStore)
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setNodeManager(nodeManager)
+ .build();
+
+ // Create PipelineManager
+ PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf,
+ nodeManager, stateManager, pipelineFactory);
+
+ // Create background thread.
+ Scheduler scheduler = new Scheduler(
+ "RatisPipelineUtilsThread", false, 1);
+ BackgroundPipelineCreator backgroundPipelineCreator =
+ new BackgroundPipelineCreator(pipelineManager, scheduler, conf);
+ pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
+ pipelineManager.setScheduler(scheduler);
+
+ return pipelineManager;
+ }
+
+ @Override
+ public Pipeline createPipeline(ReplicationType type,
+ ReplicationFactor factor) throws IOException {
+ if (!isPipelineCreationAllowed() && factor != ReplicationFactor.ONE) {
+ LOG.debug("Pipeline creation is not allowed until safe mode prechecks " +
+ "complete");
+ throw new IOException("Pipeline creation is not allowed as safe mode " +
+ "prechecks have not yet passed");
+ }
+ lock.writeLock().lock();
+ try {
+ Pipeline pipeline = pipelineFactory.create(type, factor);
+ stateManager.addPipeline(pipeline.getProtobufMessage());
+ recordMetricsForPipeline(pipeline);
+ return pipeline;
+ } catch (IOException ex) {
+ LOG.error("Failed to create pipeline of type {} and factor {}. " +
+ "Exception: {}", type, factor, ex.getMessage());
+ metrics.incNumPipelineCreationFailed();
+ throw ex;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
+ List<DatanodeDetails> nodes) {
+ return null;
+ }
+
+ @Override
+ public Pipeline getPipeline(PipelineID pipelineID)
+ throws PipelineNotFoundException {
+ lock.readLock().lock();
+ try {
+ return stateManager.getPipeline(pipelineID);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public boolean containsPipeline(PipelineID pipelineID) {
+ lock.readLock().lock();
+ try {
+ getPipeline(pipelineID);
+ return true;
+ } catch (PipelineNotFoundException e) {
+ return false;
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public List<Pipeline> getPipelines() {
+ lock.readLock().lock();
+ try {
+ return stateManager.getPipelines();
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public List<Pipeline> getPipelines(ReplicationType type) {
+ try {
+ return stateManager.getPipelines(type);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public List<Pipeline> getPipelines(ReplicationType type,
+ ReplicationFactor factor) {
+ lock.readLock().lock();
+ try {
+ return stateManager.getPipelines(type, factor);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public List<Pipeline> getPipelines(ReplicationType type,
+ Pipeline.PipelineState state) {
+ lock.readLock().lock();
+ try {
+ return stateManager.getPipelines(type, state);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public List<Pipeline> getPipelines(ReplicationType type,
+ ReplicationFactor factor,
+ Pipeline.PipelineState state) {
+ lock.readLock().lock();
+ try {
+ return stateManager.getPipelines(type, factor, state);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public List<Pipeline> getPipelines(
+ ReplicationType type, ReplicationFactor factor,
+ Pipeline.PipelineState state, Collection<DatanodeDetails> excludeDns,
+ Collection<PipelineID> excludePipelines) {
+ lock.readLock().lock();
+ try {
+ return stateManager
+ .getPipelines(type, factor, state, excludeDns, excludePipelines);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public void addContainerToPipeline(
+ PipelineID pipelineID, ContainerID containerID) throws IOException {
+ lock.writeLock().lock();
+ try {
+ stateManager.addContainerToPipeline(pipelineID, containerID);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void removeContainerFromPipeline(
+ PipelineID pipelineID, ContainerID containerID) throws IOException {
+ lock.writeLock().lock();
+ try {
+ stateManager.removeContainerFromPipeline(pipelineID, containerID);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public NavigableSet<ContainerID> getContainersInPipeline(
+ PipelineID pipelineID) throws IOException {
+ lock.readLock().lock();
+ try {
+ return stateManager.getContainers(pipelineID);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
+ public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
+ return stateManager.getNumberOfContainers(pipelineID);
+ }
+
+ @Override
+ public void openPipeline(PipelineID pipelineId) throws IOException {
+ lock.writeLock().lock();
+ try {
+ Pipeline pipeline = stateManager.getPipeline(pipelineId);
+ if (pipeline.isClosed()) {
+ throw new IOException("Closed pipeline can not be opened");
+ }
+ if (pipeline.getPipelineState() == Pipeline.PipelineState.ALLOCATED) {
+ LOG.info("Pipeline {} moved to OPEN state", pipeline);
+ stateManager.updatePipelineState(pipelineId.getProtobuf(),
+ HddsProtos.PipelineState.PIPELINE_OPEN);
+ }
+ metrics.incNumPipelineCreated();
+ metrics.createPerPipelineMetrics(pipeline);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Finalizes pipeline in the SCM. Removes pipeline and makes rpc call to
+ * destroy pipeline on the datanodes immediately or after timeout based on the
+ * value of onTimeout parameter.
+ *
+ * @param pipeline - Pipeline to be destroyed
+ * @param onTimeout - if true pipeline is removed and destroyed on
+ * datanodes after timeout
+ * @throws IOException
+ */
+ @Override
+ public void finalizeAndDestroyPipeline(Pipeline pipeline, boolean onTimeout)
+ throws IOException {
+ LOG.info("Destroying pipeline:{}", pipeline);
+ finalizePipeline(pipeline.getId());
+ if (onTimeout) {
+ long pipelineDestroyTimeoutInMillis =
+ conf.getTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ scheduler.schedule(() -> destroyPipeline(pipeline),
+ pipelineDestroyTimeoutInMillis, TimeUnit.MILLISECONDS, LOG,
+ String.format("Destroy pipeline failed for pipeline:%s", pipeline));
+ } else {
+ destroyPipeline(pipeline);
+ }
+ }
+
+ /**
+ * Moves the pipeline to CLOSED state and sends close container command for
+ * all the containers in the pipeline.
+ *
+ * @param pipelineId - ID of the pipeline to be moved to CLOSED state.
+ * @throws IOException
+ */
+ private void finalizePipeline(PipelineID pipelineId) throws IOException {
+ lock.writeLock().lock();
+ try {
+ Pipeline pipeline = stateManager.getPipeline(pipelineId);
+ if (!pipeline.isClosed()) {
+ stateManager.updatePipelineState(pipelineId.getProtobuf(),
+ HddsProtos.PipelineState.PIPELINE_CLOSED);
+ LOG.info("Pipeline {} moved to CLOSED state", pipeline);
+ }
+
+ // TODO fire events to datanodes for closing pipelines
+// Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
+// for (ContainerID containerID : containerIDs) {
+// eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
+// }
+ metrics.removePipelineMetrics(pipelineId);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ /**
+ * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
+ * the datanodes for ratis pipelines.
+ *
+ * @param pipeline - Pipeline to be destroyed
+ * @throws IOException
+ */
+ protected void destroyPipeline(Pipeline pipeline) throws IOException {
+ pipelineFactory.close(pipeline.getType(), pipeline);
+ // remove the pipeline from the pipeline manager
+ removePipeline(pipeline.getId());
+ triggerPipelineCreation();
+ }
+
+ /**
+ * Removes the pipeline from the db and pipeline state map.
+ *
+ * @param pipelineId - ID of the pipeline to be removed
+ * @throws IOException
+ */
+ protected void removePipeline(PipelineID pipelineId) throws IOException {
+ lock.writeLock().lock();
+ try {
+ stateManager.removePipeline(pipelineId.getProtobuf());
+ metrics.incNumPipelineDestroyed();
+ } catch (IOException ex) {
+ metrics.incNumPipelineDestroyFailed();
+ throw ex;
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ @Override
+ public void scrubPipeline(ReplicationType type, ReplicationFactor factor)
+ throws IOException{
+ if (type != ReplicationType.RATIS || factor != ReplicationFactor.THREE) {
+ // Only srub pipeline for RATIS THREE pipeline
+ return;
+ }
+ Instant currentTime = Instant.now();
+ Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+ List<Pipeline> needToSrubPipelines = stateManager.getPipelines(type, factor,
+ Pipeline.PipelineState.ALLOCATED).stream()
+ .filter(p -> currentTime.toEpochMilli() - p.getCreationTimestamp()
+ .toEpochMilli() >= pipelineScrubTimeoutInMills)
+ .collect(Collectors.toList());
+ for (Pipeline p : needToSrubPipelines) {
+ LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
+ " since it stays at ALLOCATED stage for " +
+ Duration.between(currentTime, p.getCreationTimestamp()).toMinutes() +
+ " mins.");
+ finalizeAndDestroyPipeline(p, false);
+ }
+ }
+
+ /**
+ * Schedules a fixed interval job to create pipelines.
+ */
+ @Override
+ public void startPipelineCreator() {
+ backgroundPipelineCreator.startFixedIntervalPipelineCreator();
+ }
+
+ /**
+ * Triggers pipeline creation after the specified time.
+ */
+ @Override
+ public void triggerPipelineCreation() {
+ backgroundPipelineCreator.triggerPipelineCreation();
+ }
+
+ @Override
+ public void incNumBlocksAllocatedMetric(PipelineID id) {
+ metrics.incNumBlocksAllocated(id);
+ }
+
+ /**
+ * Activates a dormant pipeline.
+ *
+ * @param pipelineID ID of the pipeline to activate.
+ * @throws IOException in case of any Exception
+ */
+ @Override
+ public void activatePipeline(PipelineID pipelineID)
+ throws IOException {
+ stateManager.updatePipelineState(pipelineID.getProtobuf(),
+ HddsProtos.PipelineState.PIPELINE_OPEN);
+ }
+
+ /**
+ * Deactivates an active pipeline.
+ *
+ * @param pipelineID ID of the pipeline to deactivate.
+ * @throws IOException in case of any Exception
+ */
+ @Override
+ public void deactivatePipeline(PipelineID pipelineID)
+ throws IOException {
+ stateManager.updatePipelineState(pipelineID.getProtobuf(),
+ HddsProtos.PipelineState.PIPELINE_DORMANT);
+ }
+
+ /**
+ * Wait a pipeline to be OPEN.
+ *
+ * @param pipelineID ID of the pipeline to wait for.
+ * @param timeout wait timeout, millisecond, 0 to use default value
+ * @throws IOException in case of any Exception, such as timeout
+ */
+ @Override
+ public void waitPipelineReady(PipelineID pipelineID, long timeout)
+ throws IOException {
+ long st = Time.monotonicNow();
+ if (timeout == 0) {
+ timeout = pipelineWaitDefaultTimeout;
+ }
+
+ boolean ready;
+ Pipeline pipeline;
+ do {
+ try {
+ pipeline = stateManager.getPipeline(pipelineID);
+ } catch (PipelineNotFoundException e) {
+ throw new PipelineNotFoundException(String.format(
+ "Pipeline %s cannot be found", pipelineID));
+ }
+ ready = pipeline.isOpen();
+ if (!ready) {
+ try {
+ Thread.sleep((long)100);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ } while (!ready && Time.monotonicNow() - st < timeout);
+
+ if (!ready) {
+ throw new IOException(String.format("Pipeline %s is not ready in %d ms",
+ pipelineID, timeout));
+ }
+ }
+
+ @Override
+ public Map<String, Integer> getPipelineInfo() {
+ final Map<String, Integer> pipelineInfo = new HashMap<>();
+ for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
+ pipelineInfo.put(state.toString(), 0);
+ }
+ stateManager.getPipelines().forEach(pipeline ->
+ pipelineInfo.computeIfPresent(
+ pipeline.getPipelineState().toString(), (k, v) -> v + 1));
+ return pipelineInfo;
+ }
+
+ /**
+ * Get SafeMode status.
+ * @return boolean
+ */
+ @Override
+ public boolean getSafeModeStatus() {
+ return this.isInSafeMode.get();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (scheduler != null) {
+ scheduler.close();
+ scheduler = null;
+ }
+
+ if(pmInfoBean != null) {
+ MBeans.unregister(this.pmInfoBean);
+ pmInfoBean = null;
+ }
+
+ SCMPipelineMetrics.unRegister();
+
+ // shutdown pipeline provider.
+ pipelineFactory.shutdown();
+ }
+
+ @Override
+ public void onMessage(SCMSafeModeManager.SafeModeStatus status,
+ EventPublisher publisher) {
+ // TODO: #CLUTIL - handle safemode getting re-enabled
+ boolean currentAllowPipelines =
+ pipelineCreationAllowed.getAndSet(status.isPreCheckComplete());
+ boolean currentlyInSafeMode =
+ isInSafeMode.getAndSet(status.isInSafeMode());
+
+ // Trigger pipeline creation only if the preCheck status has changed to
+ // complete.
+ if (isPipelineCreationAllowed() && !currentAllowPipelines) {
+ triggerPipelineCreation();
+ }
+ // Start the pipeline creation thread only when safemode switches off
+ if (!getSafeModeStatus() && currentlyInSafeMode) {
+ startPipelineCreator();
+ }
+ }
+
+ @VisibleForTesting
+ public boolean isPipelineCreationAllowed() {
+ return pipelineCreationAllowed.get();
+ }
+
+ private void setBackgroundPipelineCreator(
+ BackgroundPipelineCreator backgroundPipelineCreator) {
+ this.backgroundPipelineCreator = backgroundPipelineCreator;
+ }
+
+ private void setScheduler(Scheduler scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ private void recordMetricsForPipeline(Pipeline pipeline) {
+ metrics.incNumPipelineAllocated();
+ if (pipeline.isOpen()) {
+ metrics.incNumPipelineCreated();
+ metrics.createPerPipelineMetrics(pipeline);
+ }
+ switch (pipeline.getType()) {
+ case STAND_ALONE:
+ return;
+ case RATIS:
+ List<Pipeline> overlapPipelines = RatisPipelineUtils
+ .checkPipelineContainSameDatanodes(stateManager, pipeline);
+ if (!overlapPipelines.isEmpty()) {
+ // Count 1 overlap at a time.
+ metrics.incNumPipelineContainSameDatanodes();
+ //TODO remove until pipeline allocation is proved equally distributed.
+ for (Pipeline overlapPipeline : overlapPipelines) {
+ LOG.info("Pipeline: " + pipeline.getId().toString() +
+ " contains same datanodes as previous pipelines: " +
+ overlapPipeline.getId().toString() + " nodeIds: " +
+ pipeline.getNodes().get(0).getUuid().toString() +
+ ", " + pipeline.getNodes().get(1).getUuid().toString() +
+ ", " + pipeline.getNodes().get(2).getUuid().toString());
+ }
+ }
+ return;
+ case CHAINED:
+ // Not supported.
+ default:
+ // Not supported.
+ return;
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java
new file mode 100644
index 0000000..4021575
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.NavigableSet;
+
+/**
+ * Manages the state of pipelines in SCM.
+ */
+public interface PipelineStateManagerV2 {
+
+ /**
+ * Adding pipeline would be replicated to Ratis.
+ * @param pipelineProto
+ * @throws IOException
+ */
+ @Replicate
+ void addPipeline(HddsProtos.Pipeline pipelineProto) throws IOException;
+
+ /**
+ * Removing pipeline would be replicated to Ratis.
+ * @param pipelineIDProto
+ * @return Pipeline removed
+ * @throws IOException
+ */
+ @Replicate
+ void removePipeline(HddsProtos.PipelineID pipelineIDProto)
+ throws IOException;
+
+ /**
+ * Updating pipeline state would be replicated to Ratis.
+ * @param pipelineIDProto
+ * @param newState
+ * @throws IOException
+ */
+ @Replicate
+ void updatePipelineState(HddsProtos.PipelineID pipelineIDProto,
+ HddsProtos.PipelineState newState)
+ throws IOException;
+
+ void addContainerToPipeline(PipelineID pipelineID,
+ ContainerID containerID) throws IOException;
+
+ Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException;
+
+ List<Pipeline> getPipelines();
+
+ List<Pipeline> getPipelines(HddsProtos.ReplicationType type);
+
+ List<Pipeline> getPipelines(HddsProtos.ReplicationType type,
+ HddsProtos.ReplicationFactor factor);
+
+ List<Pipeline> getPipelines(HddsProtos.ReplicationType type,
+ HddsProtos.ReplicationFactor factor,
+ Pipeline.PipelineState state);
+
+ List<Pipeline> getPipelines(HddsProtos.ReplicationType type,
+ HddsProtos.ReplicationFactor factor,
+ Pipeline.PipelineState state,
+ Collection<DatanodeDetails> excludeDns,
+ Collection<PipelineID> excludePipelines);
+
+ List<Pipeline> getPipelines(HddsProtos.ReplicationType type,
+ Pipeline.PipelineState... states);
+
+ NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
+ throws IOException;
+
+ int getNumberOfContainers(PipelineID pipelineID) throws IOException;
+
+
+ void removeContainerFromPipeline(PipelineID pipelineID,
+ ContainerID containerID) throws IOException;
+
+ void close() throws Exception;
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
new file mode 100644
index 0000000..c74dc86
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
@@ -0,0 +1,226 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.SCMRatisProtocol;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.lang.reflect.Proxy;
+import java.util.Collection;
+import java.util.List;
+import java.util.NavigableSet;
+
+/**
+ * Implementation of pipeline state manager.
+ * PipelineStateMap class holds the data structures related to pipeline and its
+ * state. All the read and write operations in PipelineStateMap are protected
+ * by a read write lock.
+ */
+public class PipelineStateManagerV2Impl implements PipelineStateManagerV2 {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(PipelineStateManager.class);
+
+ private final PipelineStateMap pipelineStateMap;
+ private final NodeManager nodeManager;
+ private Table<PipelineID, Pipeline> pipelineStore;
+
+ public PipelineStateManagerV2Impl(
+ Table<PipelineID, Pipeline> pipelineStore, NodeManager nodeManager)
+ throws IOException {
+ this.pipelineStateMap = new PipelineStateMap();
+ this.nodeManager = nodeManager;
+ this.pipelineStore = pipelineStore;
+ initialize();
+ }
+
+ private void initialize() throws IOException {
+ if (pipelineStore == null || nodeManager == null) {
+ throw new IOException("PipelineStore cannot be null");
+ }
+ if (pipelineStore.isEmpty()) {
+ LOG.info("No pipeline exists in current db");
+ return;
+ }
+ TableIterator<PipelineID, ? extends Table.KeyValue<PipelineID, Pipeline>>
+ iterator = pipelineStore.iterator();
+ while (iterator.hasNext()) {
+ Pipeline pipeline = iterator.next().getValue();
+ addPipeline(pipeline.getProtobufMessage());
+ }
+ }
+
+ @Override
+ public void addPipeline(HddsProtos.Pipeline pipelineProto)
+ throws IOException {
+ Pipeline pipeline = Pipeline.getFromProtobuf(pipelineProto);
+ pipelineStore.put(pipeline.getId(), pipeline);
+ pipelineStateMap.addPipeline(pipeline);
+ nodeManager.addPipeline(pipeline);
+ LOG.info("Created pipeline {}.", pipeline);
+ }
+
+ @Override
+ public void addContainerToPipeline(
+ PipelineID pipelineId, ContainerID containerID)
+ throws IOException {
+ pipelineStateMap.addContainerToPipeline(pipelineId, containerID);
+ }
+
+ @Override
+ public Pipeline getPipeline(PipelineID pipelineID)
+ throws PipelineNotFoundException {
+ return pipelineStateMap.getPipeline(pipelineID);
+ }
+
+ @Override
+ public List<Pipeline> getPipelines() {
+ return pipelineStateMap.getPipelines();
+ }
+
+ @Override
+ public List<Pipeline> getPipelines(HddsProtos.ReplicationType type) {
+ return pipelineStateMap.getPipelines(type);
+ }
+
+ @Override
+ public List<Pipeline> getPipelines(
+ HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor) {
+ return pipelineStateMap.getPipelines(type, factor);
+ }
+
+ @Override
+ public List<Pipeline> getPipelines(
+ HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
+ Pipeline.PipelineState state) {
+ return pipelineStateMap.getPipelines(type, factor, state);
+ }
+
+ @Override
+ public List<Pipeline> getPipelines(
+ HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
+ Pipeline.PipelineState state, Collection<DatanodeDetails> excludeDns,
+ Collection<PipelineID> excludePipelines) {
+ return pipelineStateMap
+ .getPipelines(type, factor, state, excludeDns, excludePipelines);
+ }
+
+ @Override
+ public List<Pipeline> getPipelines(HddsProtos.ReplicationType type,
+ Pipeline.PipelineState... states) {
+ return pipelineStateMap.getPipelines(type, states);
+ }
+
+ @Override
+ public NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
+ throws IOException {
+ return pipelineStateMap.getContainers(pipelineID);
+ }
+
+ @Override
+ public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
+ return pipelineStateMap.getNumberOfContainers(pipelineID);
+ }
+
+ @Override
+ public void removePipeline(HddsProtos.PipelineID pipelineIDProto)
+ throws IOException {
+ PipelineID pipelineID = PipelineID.getFromProtobuf(pipelineIDProto);
+ pipelineStore.delete(pipelineID);
+ Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID);
+ nodeManager.removePipeline(pipeline);
+ LOG.info("Pipeline {} removed.", pipeline);
+ return;
+ }
+
+
+ @Override
+ public void removeContainerFromPipeline(
+ PipelineID pipelineID, ContainerID containerID) throws IOException {
+ pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
+ }
+
+ @Override
+ public void updatePipelineState(
+ HddsProtos.PipelineID pipelineIDProto, HddsProtos.PipelineState newState)
+ throws IOException {
+ pipelineStateMap.updatePipelineState(
+ PipelineID.getFromProtobuf(pipelineIDProto),
+ Pipeline.PipelineState.fromProtobuf(newState));
+ }
+
+ @Override
+ public void close() throws Exception {
+ pipelineStore.close();
+ }
+
+ public static Builder newBuilder() {
+ return new Builder();
+ }
+
+ /**
+ * Builder for PipelineStateManager.
+ */
+ public static class Builder {
+ private Table<PipelineID, Pipeline> pipelineStore;
+ private NodeManager nodeManager;
+ private SCMRatisServer scmRatisServer;
+
+ public Builder setRatisServer(final SCMRatisServer ratisServer) {
+ scmRatisServer = ratisServer;
+ return this;
+ }
+
+ public Builder setNodeManager(final NodeManager scmNodeManager) {
+ nodeManager = scmNodeManager;
+ return this;
+ }
+
+ public Builder setPipelineStore(
+ final Table<PipelineID, Pipeline> pipelineTable) {
+ this.pipelineStore = pipelineTable;
+ return this;
+ }
+
+ public PipelineStateManagerV2 build() throws IOException {
+ Preconditions.checkNotNull(pipelineStore);
+
+ final PipelineStateManagerV2 pipelineStateManager =
+ new PipelineStateManagerV2Impl(pipelineStore, nodeManager);
+
+ final SCMHAInvocationHandler invocationHandler =
+ new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.PIPELINE,
+ pipelineStateManager, scmRatisServer);
+
+ return (PipelineStateManagerV2) Proxy.newProxyInstance(
+ SCMHAInvocationHandler.class.getClassLoader(),
+ new Class<?>[]{PipelineStateManagerV2.class}, invocationHandler);
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index 5c9b202..edc40af 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -117,4 +117,23 @@ public final class RatisPipelineUtils {
p.sameDatanodes(pipeline)))
.collect(Collectors.toList());
}
+
+ /**
+ * Return the list of pipelines who share the same set of datanodes
+ * with the input pipeline.
+ *
+ * @param stateManager PipelineStateManager
+ * @param pipeline input pipeline
+ * @return list of matched pipeline
+ */
+ static List<Pipeline> checkPipelineContainSameDatanodes(
+ PipelineStateManagerV2 stateManager, Pipeline pipeline) {
+ return stateManager.getPipelines(
+ HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.THREE)
+ .stream().filter(p -> !p.getId().equals(pipeline.getId()) &&
+ (p.getPipelineState() != Pipeline.PipelineState.CLOSED &&
+ p.sameDatanodes(pipeline)))
+ .collect(Collectors.toList());
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org