You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by li...@apache.org on 2020/06/03 03:34:31 UTC

[hadoop-ozone] branch HDDS-2823 updated: HDDS-3693 Switch to new StateManager interface. (#1007)

This is an automated email from the ASF dual-hosted git repository.

licheng pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/HDDS-2823 by this push:
     new bc68c48  HDDS-3693 Switch to new StateManager interface. (#1007)
bc68c48 is described below

commit bc68c4843a84b9738c8f3e1bd3c47eaf47d4f12f
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Wed Jun 3 11:34:21 2020 +0800

    HDDS-3693 Switch to new StateManager interface. (#1007)
    
    Co-authored-by: Li Cheng <ti...@tencent.com>
---
 .../hadoop/hdds/scm/pipeline/PipelineFactory.java  |  2 +-
 .../hdds/scm/pipeline/PipelineManagerV2Impl.java   | 22 ++++--
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |  5 +-
 .../hadoop/hdds/scm/pipeline/PipelineProvider.java |  6 +-
 .../hdds/scm/pipeline/PipelineStateManager.java    | 80 ++++++++++++++++----
 .../scm/pipeline/PipelineStateManagerV2Impl.java   | 52 +++++++++++--
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |  4 +-
 .../hdds/scm/pipeline/RatisPipelineUtils.java      |  2 +-
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  4 +-
 .../hdds/scm/pipeline/SimplePipelineProvider.java  |  2 +-
 ...pelineStateManagerV2.java => StateManager.java} | 23 +++++-
 .../scm/pipeline/MockRatisPipelineProvider.java    | 20 ++---
 .../hdds/scm/pipeline/TestPipelieManagerImpl.java  | 87 ++++++++++++++++++++++
 .../TestPipelineDatanodesIntersection.java         |  2 +-
 .../scm/pipeline/TestPipelineStateManager.java     |  2 +-
 .../ozone/recon/scm/ReconPipelineManager.java      |  3 +-
 16 files changed, 262 insertions(+), 54 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index e1cf382..bdd5053 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -39,7 +39,7 @@ public class PipelineFactory {
 
   private Map<ReplicationType, PipelineProvider> providers;
 
-  PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
+  PipelineFactory(NodeManager nodeManager, StateManager stateManager,
       ConfigurationSource conf, EventPublisher eventPublisher) {
     providers = new HashMap<>();
     providers.put(ReplicationType.STAND_ALONE,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index a6a3249..f451000 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -58,13 +58,13 @@ import java.util.stream.Collectors;
  * All the write operations for pipelines must come via PipelineManager.
  * It synchronises all write and read operations via a ReadWriteLock.
  */
-public class PipelineManagerV2Impl implements PipelineManager {
+public final class PipelineManagerV2Impl implements PipelineManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(SCMPipelineManager.class);
 
   private final ReadWriteLock lock;
   private PipelineFactory pipelineFactory;
-  private PipelineStateManagerV2 stateManager;
+  private StateManager stateManager;
   private Scheduler scheduler;
   private BackgroundPipelineCreator backgroundPipelineCreator;
   private final ConfigurationSource conf;
@@ -77,9 +77,9 @@ public class PipelineManagerV2Impl implements PipelineManager {
   // to prevent pipelines being created until sufficient nodes have registered.
   private final AtomicBoolean pipelineCreationAllowed;
 
-  public PipelineManagerV2Impl(ConfigurationSource conf,
+  private PipelineManagerV2Impl(ConfigurationSource conf,
                                NodeManager nodeManager,
-                               PipelineStateManagerV2 pipelineStateManager,
+                               StateManager pipelineStateManager,
                                PipelineFactory pipelineFactory) {
     this.lock = new ReentrantReadWriteLock();
     this.pipelineFactory = pipelineFactory;
@@ -100,17 +100,20 @@ public class PipelineManagerV2Impl implements PipelineManager {
     this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get());
   }
 
-  public static PipelineManager newPipelineManager(
+  public static PipelineManagerV2Impl newPipelineManager(
       ConfigurationSource conf, SCMHAManager scmhaManager,
       NodeManager nodeManager, Table<PipelineID, Pipeline> pipelineStore,
-      PipelineFactory pipelineFactory) throws IOException {
+      EventPublisher eventPublisher) throws IOException {
     // Create PipelineStateManager
-    PipelineStateManagerV2 stateManager = PipelineStateManagerV2Impl
+    StateManager stateManager = PipelineStateManagerV2Impl
         .newBuilder().setPipelineStore(pipelineStore)
         .setRatisServer(scmhaManager.getRatisServer())
         .setNodeManager(nodeManager)
         .build();
 
+    // Create PipelineFactory
+    PipelineFactory pipelineFactory = new PipelineFactory(
+        nodeManager, stateManager, conf, eventPublisher);
     // Create PipelineManager
     PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf,
         nodeManager, stateManager, pipelineFactory);
@@ -572,6 +575,11 @@ public class PipelineManagerV2Impl implements PipelineManager {
     return pipelineCreationAllowed.get();
   }
 
+  @VisibleForTesting
+  public void allowPipelineCreation() {
+    this.pipelineCreationAllowed.set(true);
+  }
+
   private void setBackgroundPipelineCreator(
       BackgroundPipelineCreator backgroundPipelineCreator) {
     this.backgroundPipelineCreator = backgroundPipelineCreator;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 524b5ec..6573271 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -51,7 +51,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
   static final Logger LOG =
       LoggerFactory.getLogger(PipelinePlacementPolicy.class);
   private final NodeManager nodeManager;
-  private final PipelineStateManager stateManager;
+  private final StateManager stateManager;
   private final ConfigurationSource conf;
   private final int heavyNodeCriteria;
   private static final int REQUIRED_RACKS = 2;
@@ -65,7 +65,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
    * @param conf        Configuration
    */
   public PipelinePlacementPolicy(final NodeManager nodeManager,
-      final PipelineStateManager stateManager, final ConfigurationSource conf) {
+                                 final StateManager stateManager,
+                                 final ConfigurationSource conf) {
     super(nodeManager, conf);
     this.nodeManager = nodeManager;
     this.conf = conf;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index 533f77e..576d415 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -37,10 +37,10 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
 public abstract class PipelineProvider {
 
   private final NodeManager nodeManager;
-  private final PipelineStateManager stateManager;
+  private final StateManager stateManager;
 
   public PipelineProvider(NodeManager nodeManager,
-      PipelineStateManager stateManager) {
+      StateManager stateManager) {
     this.nodeManager = nodeManager;
     this.stateManager = stateManager;
   }
@@ -54,7 +54,7 @@ public abstract class PipelineProvider {
     return nodeManager;
   }
 
-  public PipelineStateManager getPipelineStateManager() {
+  public StateManager getPipelineStateManager() {
     return stateManager;
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index bb56a03..de6f186 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.scm.pipeline;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -38,7 +39,7 @@ import java.util.NavigableSet;
  * state. All the read and write operations in PipelineStateMap are protected
  * by a read write lock.
  */
-public class PipelineStateManager {
+public class PipelineStateManager implements StateManager {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(PipelineStateManager.class);
@@ -48,71 +49,90 @@ public class PipelineStateManager {
   public PipelineStateManager() {
     this.pipelineStateMap = new PipelineStateMap();
   }
-
+  @Override
   public void addPipeline(Pipeline pipeline) throws IOException {
     pipelineStateMap.addPipeline(pipeline);
     LOG.info("Created pipeline {}", pipeline);
   }
 
-  void addContainerToPipeline(PipelineID pipelineId, ContainerID containerID)
+  @Override
+  public void addContainerToPipeline(PipelineID pipelineId,
+                                     ContainerID containerID)
       throws IOException {
     pipelineStateMap.addContainerToPipeline(pipelineId, containerID);
   }
 
-  Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException {
+  @Override
+  public Pipeline getPipeline(PipelineID pipelineID)
+      throws PipelineNotFoundException {
     return pipelineStateMap.getPipeline(pipelineID);
   }
 
+  @Override
   public List<Pipeline> getPipelines() {
     return pipelineStateMap.getPipelines();
   }
 
-  List<Pipeline> getPipelines(ReplicationType type) {
+  @Override
+  public List<Pipeline> getPipelines(ReplicationType type) {
     return pipelineStateMap.getPipelines(type);
   }
 
-  List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor) {
+  @Override
+  public List<Pipeline> getPipelines(ReplicationType type,
+                                     ReplicationFactor factor) {
     return pipelineStateMap.getPipelines(type, factor);
   }
 
-  List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
+  @Override
+  public List<Pipeline> getPipelines(ReplicationType type,
+                                     ReplicationFactor factor,
       PipelineState state) {
     return pipelineStateMap.getPipelines(type, factor, state);
   }
 
-  List<Pipeline> getPipelines(ReplicationType type, ReplicationFactor factor,
+  @Override
+  public List<Pipeline> getPipelines(
+      ReplicationType type, ReplicationFactor factor,
       PipelineState state, Collection<DatanodeDetails> excludeDns,
       Collection<PipelineID> excludePipelines) {
     return pipelineStateMap
         .getPipelines(type, factor, state, excludeDns, excludePipelines);
   }
 
-  List<Pipeline> getPipelines(ReplicationType type, PipelineState... states) {
+  @Override
+  public List<Pipeline> getPipelines(ReplicationType type,
+                                     PipelineState... states) {
     return pipelineStateMap.getPipelines(type, states);
   }
 
-  NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
+  @Override
+  public NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
       throws IOException {
     return pipelineStateMap.getContainers(pipelineID);
   }
 
-  int getNumberOfContainers(PipelineID pipelineID) throws IOException {
+  @Override
+  public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
     return pipelineStateMap.getNumberOfContainers(pipelineID);
   }
 
-  Pipeline removePipeline(PipelineID pipelineID) throws IOException {
+  @Override
+  public Pipeline removePipeline(PipelineID pipelineID) throws IOException {
     Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID);
     LOG.info("Pipeline {} removed from db", pipeline);
     return pipeline;
   }
 
-  void removeContainerFromPipeline(PipelineID pipelineID,
+  @Override
+  public void removeContainerFromPipeline(PipelineID pipelineID,
       ContainerID containerID) throws IOException {
     pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
   }
 
-  Pipeline finalizePipeline(PipelineID pipelineId)
-      throws PipelineNotFoundException {
+  @Override
+  public Pipeline finalizePipeline(PipelineID pipelineId)
+      throws IOException {
     Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId);
     if (!pipeline.isClosed()) {
       pipeline = pipelineStateMap
@@ -122,7 +142,8 @@ public class PipelineStateManager {
     return pipeline;
   }
 
-  Pipeline openPipeline(PipelineID pipelineId) throws IOException {
+  @Override
+  public Pipeline openPipeline(PipelineID pipelineId) throws IOException {
     Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId);
     if (pipeline.isClosed()) {
       throw new IOException("Closed pipeline can not be opened");
@@ -141,6 +162,7 @@ public class PipelineStateManager {
    * @param pipelineID ID of the pipeline to activate.
    * @throws IOException in case of any Exception
    */
+  @Override
   public void activatePipeline(PipelineID pipelineID)
       throws IOException {
     pipelineStateMap
@@ -153,14 +175,40 @@ public class PipelineStateManager {
    * @param pipelineID ID of the pipeline to deactivate.
    * @throws IOException in case of any Exception
    */
+  @Override
   public void deactivatePipeline(PipelineID pipelineID)
       throws IOException {
     pipelineStateMap
         .updatePipelineState(pipelineID, PipelineState.DORMANT);
   }
 
+  @Override
   public void updatePipelineState(PipelineID id, PipelineState newState)
       throws PipelineNotFoundException {
     pipelineStateMap.updatePipelineState(id, newState);
   }
+
+  @Override
+  public void addPipeline(HddsProtos.Pipeline pipelineProto)
+      throws IOException {
+    throw new IOException("Not supported.");
+  }
+
+  @Override
+  public void removePipeline(HddsProtos.PipelineID pipelineIDProto)
+      throws IOException {
+    throw new IOException("Not supported.");
+  }
+
+  @Override
+  public void updatePipelineState(HddsProtos.PipelineID pipelineIDProto,
+                                  HddsProtos.PipelineState newState)
+      throws IOException {
+    throw new IOException("Not supported.");
+  }
+
+  @Override
+  public void close() {
+    // Do nothing
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
index c74dc86..703cdec 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
@@ -42,7 +42,7 @@ import java.util.NavigableSet;
  * state. All the read and write operations in PipelineStateMap are protected
  * by a read write lock.
  */
-public class PipelineStateManagerV2Impl implements PipelineStateManagerV2 {
+public class PipelineStateManagerV2Impl implements StateManager {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(PipelineStateManager.class);
@@ -180,6 +180,48 @@ public class PipelineStateManagerV2Impl implements PipelineStateManagerV2 {
     pipelineStore.close();
   }
 
+  // TODO Remove legacy
+  @Override
+  public void addPipeline(Pipeline pipeline) throws IOException {
+    throw new IOException("Not supported.");
+  }
+
+  @Override
+  public Pipeline removePipeline(PipelineID pipelineID) throws IOException {
+    throw new IOException("Not supported.");
+  }
+
+  @Override
+  public void updatePipelineState(PipelineID id,
+                                  Pipeline.PipelineState newState)
+      throws IOException {
+    throw new IOException("Not supported.");
+  }
+
+  @Override
+  public Pipeline finalizePipeline(PipelineID pipelineId)
+      throws IOException {
+    throw new IOException("Not supported.");
+  }
+
+
+  @Override
+  public Pipeline openPipeline(PipelineID pipelineId) throws IOException {
+    throw new IOException("Not supported.");
+  }
+
+  @Override
+  public void activatePipeline(PipelineID pipelineID) throws IOException {
+    throw new IOException("Not supported.");
+  }
+
+  @Override
+  public void deactivatePipeline(PipelineID pipelineID) throws IOException {
+    throw new IOException("Not supported.");
+  }
+
+  // legacy interfaces end
+
   public static Builder newBuilder() {
     return new Builder();
   }
@@ -208,19 +250,19 @@ public class PipelineStateManagerV2Impl implements PipelineStateManagerV2 {
       return this;
     }
 
-    public PipelineStateManagerV2 build() throws IOException {
+    public StateManager build() throws IOException {
       Preconditions.checkNotNull(pipelineStore);
 
-      final PipelineStateManagerV2 pipelineStateManager =
+      final StateManager pipelineStateManager =
           new PipelineStateManagerV2Impl(pipelineStore, nodeManager);
 
       final SCMHAInvocationHandler invocationHandler =
           new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.PIPELINE,
               pipelineStateManager, scmRatisServer);
 
-      return (PipelineStateManagerV2) Proxy.newProxyInstance(
+      return (StateManager) Proxy.newProxyInstance(
           SCMHAInvocationHandler.class.getClassLoader(),
-          new Class<?>[]{PipelineStateManagerV2.class}, invocationHandler);
+          new Class<?>[]{StateManager.class}, invocationHandler);
     }
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 4d91541..821ed30 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -54,8 +54,8 @@ public class RatisPipelineProvider extends PipelineProvider {
   private int maxPipelinePerDatanode;
 
   RatisPipelineProvider(NodeManager nodeManager,
-      PipelineStateManager stateManager, ConfigurationSource conf,
-      EventPublisher eventPublisher) {
+                        StateManager stateManager, ConfigurationSource conf,
+                        EventPublisher eventPublisher) {
     super(nodeManager, stateManager);
     this.conf = conf;
     this.eventPublisher = eventPublisher;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index edc40af..19a8fc5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -127,7 +127,7 @@ public final class RatisPipelineUtils {
    * @return list of matched pipeline
    */
   static List<Pipeline> checkPipelineContainSameDatanodes(
-      PipelineStateManagerV2 stateManager, Pipeline pipeline) {
+      StateManager stateManager, Pipeline pipeline) {
     return stateManager.getPipelines(
         HddsProtos.ReplicationType.RATIS,
         HddsProtos.ReplicationFactor.THREE)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 941ce19..71c3190 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -68,7 +68,7 @@ public class SCMPipelineManager implements PipelineManager {
 
   private final ReadWriteLock lock;
   private PipelineFactory pipelineFactory;
-  private PipelineStateManager stateManager;
+  private StateManager stateManager;
   private final BackgroundPipelineCreator backgroundPipelineCreator;
   private Scheduler scheduler;
 
@@ -133,7 +133,7 @@ public class SCMPipelineManager implements PipelineManager {
     this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get());
   }
 
-  public PipelineStateManager getStateManager() {
+  public StateManager getStateManager() {
     return stateManager;
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index c7b6305..69711bb 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -34,7 +34,7 @@ import java.util.List;
 public class SimplePipelineProvider extends PipelineProvider {
 
   public SimplePipelineProvider(NodeManager nodeManager,
-      PipelineStateManager stateManager) {
+      StateManager stateManager) {
     super(nodeManager, stateManager);
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
similarity index 83%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
index 4021575..3a772e5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
@@ -29,8 +29,9 @@ import java.util.NavigableSet;
 
 /**
  * Manages the state of pipelines in SCM.
+ * TODO Rename to PipelineStateManager once the old state manager is removed.
  */
-public interface PipelineStateManagerV2 {
+public interface StateManager {
 
   /**
    * Adding pipeline would be replicated to Ratis.
@@ -96,4 +97,24 @@ public interface PipelineStateManagerV2 {
                                    ContainerID containerID) throws IOException;
 
   void close() throws Exception;
+
+  // TODO remove legacy interfaces once we switch to Ratis based.
+
+  void addPipeline(Pipeline pipeline) throws IOException;
+
+  Pipeline removePipeline(PipelineID pipelineID) throws IOException;
+
+  void updatePipelineState(PipelineID id, Pipeline.PipelineState newState)
+      throws IOException;
+
+  Pipeline finalizePipeline(PipelineID pipelineId)
+      throws IOException;
+
+  Pipeline openPipeline(PipelineID pipelineId) throws IOException;
+
+  void activatePipeline(PipelineID pipelineID)
+      throws IOException;
+
+  void deactivatePipeline(PipelineID pipelineID)
+      throws IOException;
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index f9fb150..e355877 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -36,29 +36,31 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
   private boolean autoOpenPipeline;
   private  boolean isHealthy;
 
-  public MockRatisPipelineProvider(NodeManager nodeManager,
-      PipelineStateManager stateManager, ConfigurationSource conf,
-      EventPublisher eventPublisher, boolean autoOpen) {
+  public MockRatisPipelineProvider(
+      NodeManager nodeManager, StateManager stateManager,
+      ConfigurationSource conf, EventPublisher eventPublisher,
+      boolean autoOpen) {
     super(nodeManager, stateManager, conf, eventPublisher);
     autoOpenPipeline = autoOpen;
   }
 
   public MockRatisPipelineProvider(NodeManager nodeManager,
-      PipelineStateManager stateManager,
+      StateManager stateManager,
       ConfigurationSource conf) {
     super(nodeManager, stateManager, conf, new EventQueue());
   }
 
   public MockRatisPipelineProvider(NodeManager nodeManager,
-      PipelineStateManager stateManager,
-      ConfigurationSource conf, boolean isHealthy) {
+                                   StateManager stateManager,
+                                   ConfigurationSource conf,
+                                   boolean isHealthy) {
     super(nodeManager, stateManager, conf, new EventQueue());
     this.isHealthy = isHealthy;
   }
 
-  public MockRatisPipelineProvider(NodeManager nodeManager,
-      PipelineStateManager stateManager, ConfigurationSource conf,
-      EventPublisher eventPublisher) {
+  public MockRatisPipelineProvider(
+      NodeManager nodeManager, StateManager stateManager,
+      ConfigurationSource conf, EventPublisher eventPublisher) {
     super(nodeManager, stateManager, conf, eventPublisher);
     autoOpenPipeline = true;
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelieManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelieManagerImpl.java
new file mode 100644
index 0000000..a2a8e25
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelieManagerImpl.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.UUID;
+
+/**
+ * Tests for PipelineManagerImpl.
+ */
+public class TestPipelieManagerImpl {
+  private PipelineManagerV2Impl pipelineManager;
+  private File testDir;
+  private DBStore dbStore;
+
+  @Before
+  public void init() throws Exception {
+    final OzoneConfiguration conf = SCMTestUtils.getConf();
+    testDir = GenericTestUtils.getTestDir(
+        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    dbStore = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition());
+    pipelineManager = PipelineManagerV2Impl.newPipelineManager(
+        conf, MockSCMHAManager.getInstance(),
+        new MockNodeManager(true, 20),
+        SCMDBDefinition.PIPELINES.getTable(dbStore), new EventQueue());
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    if (pipelineManager != null) {
+      pipelineManager.close();
+    }
+    if (dbStore != null) {
+      dbStore.close();
+    }
+    FileUtil.fullyDelete(testDir);
+  }
+
+  @Test
+  public void testCreatePipeline() throws Exception {
+    Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
+    pipelineManager.allowPipelineCreation();
+    Pipeline pipeline1 = pipelineManager.createPipeline(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
+    Assert.assertEquals(1, pipelineManager.getPipelines().size());
+    Assert.assertTrue(pipelineManager.containsPipeline(pipeline1.getId()));
+
+    Pipeline pipeline2 = pipelineManager.createPipeline(
+        HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.ONE);
+    Assert.assertEquals(2, pipelineManager.getPipelines().size());
+    Assert.assertTrue(pipelineManager.containsPipeline(pipeline2.getId()));
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
index 41eea3d..3320081 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
@@ -80,7 +80,7 @@ public class TestPipelineDatanodesIntersection {
     NodeManager nodeManager= new MockNodeManager(true, nodeCount);
     conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, nodeHeaviness);
     conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false);
-    PipelineStateManager stateManager = new PipelineStateManager();
+    StateManager stateManager = new PipelineStateManager();
     PipelineProvider provider = new MockRatisPipelineProvider(nodeManager,
         stateManager, conf);
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
index 6bff581..8252e2c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
@@ -37,7 +37,7 @@ import java.util.Set;
  */
 public class TestPipelineStateManager {
 
-  private PipelineStateManager stateManager;
+  private StateManager stateManager;
 
   @Before
   public void init() throws Exception {
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
index a8dd3c9..beed591 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
 import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -118,7 +117,7 @@ public class ReconPipelineManager extends SCMPipelineManager {
         if (!p.getPipelineState().equals(CLOSED)) {
           try {
             getStateManager().updatePipelineState(pipelineID, CLOSED);
-          } catch (PipelineNotFoundException e) {
+          } catch (IOException e) {
             LOG.warn("Pipeline {} not found while updating state. ",
                 p.getId(), e);
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org