You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sh...@apache.org on 2021/10/01 11:59:05 UTC

[ozone] branch master updated: HDDS-5762. remove piplelineStateManager V1 code (#2661)

This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 05eb172  HDDS-5762. remove piplelineStateManager V1 code (#2661)
05eb172 is described below

commit 05eb1726bb3b2f9f24ea1ac4640fa0874859f93a
Author: Jackson Yao <ja...@tencent.com>
AuthorDate: Fri Oct 1 19:58:50 2021 +0800

    HDDS-5762. remove piplelineStateManager V1 code (#2661)
---
 .../hadoop/hdds/scm/pipeline/PipelineFactory.java  |   2 +-
 .../hdds/scm/pipeline/PipelineManagerImpl.java     |  12 +-
 .../hdds/scm/pipeline/PipelinePlacementPolicy.java |   6 +-
 .../hadoop/hdds/scm/pipeline/PipelineProvider.java |   6 +-
 .../hdds/scm/pipeline/PipelineStateManager.java    | 266 +++++++--------------
 ...erV2Impl.java => PipelineStateManagerImpl.java} |  58 +----
 .../hdds/scm/pipeline/RatisPipelineProvider.java   |   2 +-
 .../hdds/scm/pipeline/RatisPipelineUtils.java      |   4 +-
 .../hdds/scm/pipeline/SimplePipelineProvider.java  |   2 +-
 .../hadoop/hdds/scm/pipeline/StateManager.java     | 127 ----------
 .../algorithms/DefaultLeaderChoosePolicy.java      |   4 +-
 .../choose/algorithms/LeaderChoosePolicy.java      |   8 +-
 .../algorithms/LeaderChoosePolicyFactory.java      |   8 +-
 .../algorithms/MinLeaderCountChoosePolicy.java     |   6 +-
 .../scm/container/TestContainerManagerImpl.java    |   6 +-
 .../hdds/scm/node/TestContainerPlacement.java      |  29 ++-
 .../hdds/scm/pipeline/MockPipelineManager.java     |  27 ++-
 .../scm/pipeline/MockRatisPipelineProvider.java    |   6 +-
 .../TestPipelineDatanodesIntersection.java         |  52 +++-
 .../scm/pipeline/TestPipelinePlacementPolicy.java  |  73 +++++-
 ...ager.java => TestPipelineStateManagerImpl.java} | 237 ++++++++++++------
 .../scm/pipeline/TestRatisPipelineProvider.java    |  66 ++++-
 ...TestSCMStoreImplWithOldPipelineIDKeyFormat.java | 209 ----------------
 .../scm/pipeline/TestSimplePipelineProvider.java   |  48 +++-
 .../choose/algorithms/TestLeaderChoosePolicy.java  |   6 +-
 .../ozone/recon/scm/ReconPipelineManager.java      |  10 +-
 26 files changed, 547 insertions(+), 733 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 0fbabee..b58716b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -40,7 +40,7 @@ public class PipelineFactory {
 
   private Map<ReplicationType, PipelineProvider> providers;
 
-  PipelineFactory(NodeManager nodeManager, StateManager stateManager,
+  PipelineFactory(NodeManager nodeManager, PipelineStateManager stateManager,
                   ConfigurationSource conf, EventPublisher eventPublisher,
                   SCMContext scmContext) {
     providers = new HashMap<>();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index c6651f4..8d37144 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -70,7 +70,7 @@ public class PipelineManagerImpl implements PipelineManager {
   // Limit the number of on-going ratis operation to be 1.
   private final ReentrantReadWriteLock lock;
   private PipelineFactory pipelineFactory;
-  private StateManager stateManager;
+  private PipelineStateManager stateManager;
   private BackgroundPipelineCreatorV2 backgroundPipelineCreator;
   private final ConfigurationSource conf;
   private final EventPublisher eventPublisher;
@@ -88,7 +88,7 @@ public class PipelineManagerImpl implements PipelineManager {
   protected PipelineManagerImpl(ConfigurationSource conf,
                                 SCMHAManager scmhaManager,
                                 NodeManager nodeManager,
-                                StateManager pipelineStateManager,
+                                PipelineStateManager pipelineStateManager,
                                 PipelineFactory pipelineFactory,
                                 EventPublisher eventPublisher,
                                 SCMContext scmContext) {
@@ -118,8 +118,8 @@ public class PipelineManagerImpl implements PipelineManager {
       EventPublisher eventPublisher,
       SCMContext scmContext,
       SCMServiceManager serviceManager) throws IOException {
-    // Create PipelineStateManager
-    StateManager stateManager = PipelineStateManagerV2Impl
+    // Create PipelineStateManagerImpl
+    PipelineStateManager stateManager = PipelineStateManagerImpl
         .newBuilder().setPipelineStore(pipelineStore)
         .setRatisServer(scmhaManager.getRatisServer())
         .setNodeManager(nodeManager)
@@ -553,7 +553,7 @@ public class PipelineManagerImpl implements PipelineManager {
     try {
       stateManager.close();
     } catch (Exception ex) {
-      LOG.error("PipelineStateManager close failed", ex);
+      LOG.error("PipelineStateManagerImpl close failed", ex);
     }
   }
 
@@ -569,7 +569,7 @@ public class PipelineManagerImpl implements PipelineManager {
   }
 
   @VisibleForTesting
-  public StateManager getStateManager() {
+  public PipelineStateManager getStateManager() {
     return stateManager;
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 361fd83..5782fbe 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -55,7 +55,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
   static final Logger LOG =
       LoggerFactory.getLogger(PipelinePlacementPolicy.class);
   private final NodeManager nodeManager;
-  private final StateManager stateManager;
+  private final PipelineStateManager stateManager;
   private final ConfigurationSource conf;
   private final int heavyNodeCriteria;
   private static final int REQUIRED_RACKS = 2;
@@ -70,11 +70,11 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
    * load balancing and rack awareness.
    *
    * @param nodeManager NodeManager
-   * @param stateManager PipelineStateManager
+   * @param stateManager PipelineStateManagerImpl
    * @param conf        Configuration
    */
   public PipelinePlacementPolicy(final NodeManager nodeManager,
-                                 final StateManager stateManager,
+                                 final PipelineStateManager stateManager,
                                  final ConfigurationSource conf) {
     super(nodeManager, conf);
     this.nodeManager = nodeManager;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index 92b30b6..3c00906 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -41,10 +41,10 @@ public abstract class PipelineProvider<REPLICATION_CONFIG
   private static final Logger LOG =
       LoggerFactory.getLogger(PipelineProvider.class);
   private final NodeManager nodeManager;
-  private final StateManager stateManager;
+  private final PipelineStateManager stateManager;
 
   public PipelineProvider(NodeManager nodeManager,
-      StateManager stateManager) {
+      PipelineStateManager stateManager) {
     this.nodeManager = nodeManager;
     this.stateManager = stateManager;
   }
@@ -58,7 +58,7 @@ public abstract class PipelineProvider<REPLICATION_CONFIG
     return nodeManager;
   }
 
-  public StateManager getPipelineStateManager() {
+  public PipelineStateManager getPipelineStateManager() {
     return stateManager;
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index c3df3d2..ed4ddde 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -1,19 +1,18 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
  */
 
 package org.apache.hadoop.hdds.scm.pipeline;
@@ -22,10 +21,8 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
 import org.apache.hadoop.hdds.utils.db.Table;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -33,178 +30,77 @@ import java.util.List;
 import java.util.NavigableSet;
 
 /**
- * Manages the state of pipelines in SCM. All write operations like pipeline
- * creation, removal and updates should come via SCMPipelineManager.
- * PipelineStateMap class holds the data structures related to pipeline and its
- * state. All the read and write operations in PipelineStateMap are protected
- * by a read write lock.
+ * Manages the state of pipelines in SCM.
  */
-public class PipelineStateManager implements StateManager {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(PipelineStateManager.class);
-
-  private final PipelineStateMap pipelineStateMap;
-
-  public PipelineStateManager() {
-    this.pipelineStateMap = new PipelineStateMap();
-  }
-  @Override
-  public void addPipeline(Pipeline pipeline) throws IOException {
-    pipelineStateMap.addPipeline(pipeline);
-    LOG.info("Created pipeline {}", pipeline);
-  }
-
-  @Override
-  public void addContainerToPipeline(PipelineID pipelineId,
-                                     ContainerID containerID)
-      throws IOException {
-    pipelineStateMap.addContainerToPipeline(pipelineId, containerID);
-  }
-
-  @Override
-  public Pipeline getPipeline(PipelineID pipelineID)
-      throws PipelineNotFoundException {
-    return pipelineStateMap.getPipeline(pipelineID);
-  }
-
-  @Override
-  public List<Pipeline> getPipelines() {
-    return pipelineStateMap.getPipelines();
-  }
-
-  @Override
-  public List<Pipeline> getPipelines(
-      ReplicationConfig replicationConfig
-  ) {
-    return pipelineStateMap.getPipelines(replicationConfig);
-  }
-
-  @Override
-  public List<Pipeline> getPipelines(
-      ReplicationConfig replicationConfig,
-      PipelineState state
-  ) {
-    return pipelineStateMap.getPipelines(replicationConfig, state);
-  }
+public interface PipelineStateManager {
 
-  @Override
-  public List<Pipeline> getPipelines(
-      ReplicationConfig replicationConfig,
-      PipelineState state, Collection<DatanodeDetails> excludeDns,
-      Collection<PipelineID> excludePipelines) {
-    return pipelineStateMap
-        .getPipelines(replicationConfig, state, excludeDns, excludePipelines);
-  }
-
-  @Override
-  public NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
-      throws IOException {
-    return pipelineStateMap.getContainers(pipelineID);
-  }
-
-  @Override
-  public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
-    return pipelineStateMap.getNumberOfContainers(pipelineID);
-  }
-
-  @Override
-  public Pipeline removePipeline(PipelineID pipelineID) throws IOException {
-    Pipeline pipeline = pipelineStateMap.removePipeline(pipelineID);
-    LOG.info("Pipeline {} removed from db", pipeline);
-    return pipeline;
-  }
-
-  @Override
-  public void removeContainerFromPipeline(PipelineID pipelineID,
-      ContainerID containerID) throws IOException {
-    pipelineStateMap.removeContainerFromPipeline(pipelineID, containerID);
-  }
-
-  @Override
-  public Pipeline finalizePipeline(PipelineID pipelineId)
-      throws IOException {
-    Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId);
-    if (!pipeline.isClosed()) {
-      pipeline = pipelineStateMap
-          .updatePipelineState(pipelineId, PipelineState.CLOSED);
-      LOG.info("Pipeline {} moved to CLOSED state", pipeline);
-    }
-    return pipeline;
-  }
-
-  @Override
-  public Pipeline openPipeline(PipelineID pipelineId) throws IOException {
-    Pipeline pipeline = pipelineStateMap.getPipeline(pipelineId);
-    if (pipeline.isClosed()) {
-      throw new IOException("Closed pipeline can not be opened");
-    }
-    if (pipeline.getPipelineState() == PipelineState.ALLOCATED) {
-      LOG.info("Pipeline {} moved to OPEN state", pipeline);
-      pipeline = pipelineStateMap
-          .updatePipelineState(pipelineId, PipelineState.OPEN);
-    }
-    return pipeline;
-  }
+  /**
+   * Adding pipeline would be replicated to Ratis.
+   * @param pipelineProto
+   * @throws IOException
+   */
+  @Replicate
+  void addPipeline(HddsProtos.Pipeline pipelineProto) throws IOException;
 
   /**
-   * Activates a dormant pipeline.
-   *
-   * @param pipelineID ID of the pipeline to activate.
-   * @throws IOException in case of any Exception
+   * Removing pipeline would be replicated to Ratis.
+   * @param pipelineIDProto
+   * @return Pipeline removed
+   * @throws IOException
    */
-  @Override
-  public void activatePipeline(PipelineID pipelineID)
-      throws IOException {
-    pipelineStateMap
-        .updatePipelineState(pipelineID, PipelineState.OPEN);
-  }
+  @Replicate
+  void removePipeline(HddsProtos.PipelineID pipelineIDProto)
+      throws IOException;
 
   /**
-   * Deactivates an active pipeline.
-   *
-   * @param pipelineID ID of the pipeline to deactivate.
-   * @throws IOException in case of any Exception
+   * Updating pipeline state would be replicated to Ratis.
+   * @param pipelineIDProto
+   * @param newState
+   * @throws IOException
    */
-  @Override
-  public void deactivatePipeline(PipelineID pipelineID)
-      throws IOException {
-    pipelineStateMap
-        .updatePipelineState(pipelineID, PipelineState.DORMANT);
-  }
-
-  @Override
-  public void reinitialize(Table<PipelineID, Pipeline> pipelineStore)
-      throws IOException {
-  }
-
-  @Override
-  public void updatePipelineState(PipelineID id, PipelineState newState)
-      throws PipelineNotFoundException {
-    pipelineStateMap.updatePipelineState(id, newState);
-  }
-
-  @Override
-  public void addPipeline(HddsProtos.Pipeline pipelineProto)
-      throws IOException {
-    throw new IOException("Not supported.");
-  }
-
-  @Override
-  public void removePipeline(HddsProtos.PipelineID pipelineIDProto)
-      throws IOException {
-    throw new IOException("Not supported.");
-  }
-
-  @Override
-  public void updatePipelineState(
-      HddsProtos.PipelineID pipelineIDProto, HddsProtos.PipelineState newState)
-      throws IOException {
-    throw new IOException("Not supported.");
-  }
-
-  @Override
-  public void close() {
-    // Do nothing
-  }
+  @Replicate
+  void updatePipelineState(
+      HddsProtos.PipelineID pipelineIDProto,
+      HddsProtos.PipelineState newState
+  )
+      throws IOException;
+
+  void addContainerToPipeline(
+      PipelineID pipelineID,
+      ContainerID containerID
+  ) throws IOException;
+
+  Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException;
+
+  List<Pipeline> getPipelines();
+
+  List<Pipeline> getPipelines(
+      ReplicationConfig replicationConfig
+  );
+
+  List<Pipeline> getPipelines(
+      ReplicationConfig replicationConfig,
+      Pipeline.PipelineState state
+  );
+
+  List<Pipeline> getPipelines(
+      ReplicationConfig replicationConfig,
+      Pipeline.PipelineState state,
+      Collection<DatanodeDetails> excludeDns,
+      Collection<PipelineID> excludePipelines
+  );
+
+  NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
+      throws IOException;
+
+  int getNumberOfContainers(PipelineID pipelineID) throws IOException;
+
+  void removeContainerFromPipeline(PipelineID pipelineID,
+                                   ContainerID containerID) throws IOException;
+
+  void close() throws Exception;
+
+  void reinitialize(Table<PipelineID, Pipeline> pipelineStore)
+      throws IOException;
+
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java
similarity index 88%
rename from hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
rename to hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java
index 3f748e9..c9b1d77 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerImpl.java
@@ -46,10 +46,10 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
  * state. All the read and write operations in PipelineStateMap are protected
  * by a read write lock.
  */
-public class PipelineStateManagerV2Impl implements StateManager {
+public class PipelineStateManagerImpl implements PipelineStateManager {
 
   private static final Logger LOG =
-      LoggerFactory.getLogger(PipelineStateManager.class);
+      LoggerFactory.getLogger(PipelineStateManagerImpl.class);
 
   private PipelineStateMap pipelineStateMap;
   private final NodeManager nodeManager;
@@ -60,7 +60,7 @@ public class PipelineStateManagerV2Impl implements StateManager {
   // See https://issues.apache.org/jira/browse/HDDS-4560
   private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-  public PipelineStateManagerV2Impl(
+  public PipelineStateManagerImpl(
       Table<PipelineID, Pipeline> pipelineStore, NodeManager nodeManager,
       DBTransactionBuffer buffer) throws IOException {
     this.pipelineStateMap = new PipelineStateMap();
@@ -286,46 +286,6 @@ public class PipelineStateManagerV2Impl implements StateManager {
     }
   }
 
-  // TODO Remove legacy
-  @Override
-  public void addPipeline(Pipeline pipeline) throws IOException {
-    throw new IOException("Not supported.");
-  }
-
-  @Override
-  public Pipeline removePipeline(PipelineID pipelineID) throws IOException {
-    throw new IOException("Not supported.");
-  }
-
-  @Override
-  public void updatePipelineState(PipelineID id,
-                                  Pipeline.PipelineState newState)
-      throws IOException {
-    throw new IOException("Not supported.");
-  }
-
-  @Override
-  public Pipeline finalizePipeline(PipelineID pipelineId)
-      throws IOException {
-    throw new IOException("Not supported.");
-  }
-
-
-  @Override
-  public Pipeline openPipeline(PipelineID pipelineId) throws IOException {
-    throw new IOException("Not supported.");
-  }
-
-  @Override
-  public void activatePipeline(PipelineID pipelineID) throws IOException {
-    throw new IOException("Not supported.");
-  }
-
-  @Override
-  public void deactivatePipeline(PipelineID pipelineID) throws IOException {
-    throw new IOException("Not supported.");
-  }
-
   @Override
   public void reinitialize(Table<PipelineID, Pipeline> store)
       throws IOException {
@@ -349,7 +309,7 @@ public class PipelineStateManagerV2Impl implements StateManager {
   }
 
   /**
-   * Builder for PipelineStateManager.
+   * Builder for PipelineStateManagerImpl.
    */
   public static class Builder {
     private Table<PipelineID, Pipeline> pipelineStore;
@@ -378,20 +338,20 @@ public class PipelineStateManagerV2Impl implements StateManager {
       return this;
     }
 
-    public StateManager build() throws IOException {
+    public PipelineStateManager build() throws IOException {
       Preconditions.checkNotNull(pipelineStore);
 
-      final StateManager pipelineStateManager =
-          new PipelineStateManagerV2Impl(
+      final PipelineStateManager pipelineStateManager =
+          new PipelineStateManagerImpl(
               pipelineStore, nodeManager, transactionBuffer);
 
       final SCMHAInvocationHandler invocationHandler =
           new SCMHAInvocationHandler(SCMRatisProtocol.RequestType.PIPELINE,
               pipelineStateManager, scmRatisServer);
 
-      return (StateManager) Proxy.newProxyInstance(
+      return (PipelineStateManager) Proxy.newProxyInstance(
           SCMHAInvocationHandler.class.getClassLoader(),
-          new Class<?>[]{StateManager.class}, invocationHandler);
+          new Class<?>[]{PipelineStateManager.class}, invocationHandler);
     }
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 9cc3954..d7a8067 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -65,7 +65,7 @@ public class RatisPipelineProvider
 
   @VisibleForTesting
   public RatisPipelineProvider(NodeManager nodeManager,
-                               StateManager stateManager,
+                               PipelineStateManager stateManager,
                                ConfigurationSource conf,
                                EventPublisher eventPublisher,
                                SCMContext scmContext) {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index ed35bcb..847b50e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -103,12 +103,12 @@ public final class RatisPipelineUtils {
    * Return the list of pipelines who share the same set of datanodes
    * with the input pipeline.
    *
-   * @param stateManager PipelineStateManager
+   * @param stateManager PipelineStateManagerImpl
    * @param pipeline input pipeline
    * @return list of matched pipeline
    */
   static List<Pipeline> checkPipelineContainSameDatanodes(
-      StateManager stateManager, Pipeline pipeline) {
+      PipelineStateManager stateManager, Pipeline pipeline) {
     return stateManager
         .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE))
         .stream().filter(p -> !p.getId().equals(pipeline.getId()) &&
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index 147f773..43af98b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -34,7 +34,7 @@ public class SimplePipelineProvider
     extends PipelineProvider<StandaloneReplicationConfig> {
 
   public SimplePipelineProvider(NodeManager nodeManager,
-      StateManager stateManager) {
+      PipelineStateManager stateManager) {
     super(nodeManager, stateManager);
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
deleted file mode 100644
index c075dea..0000000
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- * <p>
- * <p>http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * <p>Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.metadata.Replicate;
-import org.apache.hadoop.hdds.utils.db.Table;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.List;
-import java.util.NavigableSet;
-
-/**
- * Manages the state of pipelines in SCM.
- * TODO Rename to PipelineStateManager once the old state manager is removed.
- */
-public interface StateManager {
-
-  /**
-   * Adding pipeline would be replicated to Ratis.
-   * @param pipelineProto
-   * @throws IOException
-   */
-  @Replicate
-  void addPipeline(HddsProtos.Pipeline pipelineProto) throws IOException;
-
-  /**
-   * Removing pipeline would be replicated to Ratis.
-   * @param pipelineIDProto
-   * @return Pipeline removed
-   * @throws IOException
-   */
-  @Replicate
-  void removePipeline(HddsProtos.PipelineID pipelineIDProto)
-      throws IOException;
-
-  /**
-   * Updating pipeline state would be replicated to Ratis.
-   * @param pipelineIDProto
-   * @param newState
-   * @throws IOException
-   */
-  @Replicate
-  void updatePipelineState(
-      HddsProtos.PipelineID pipelineIDProto,
-      HddsProtos.PipelineState newState
-  )
-      throws IOException;
-
-  void addContainerToPipeline(
-      PipelineID pipelineID,
-      ContainerID containerID
-  ) throws IOException;
-
-  Pipeline getPipeline(PipelineID pipelineID) throws PipelineNotFoundException;
-
-  List<Pipeline> getPipelines();
-
-  List<Pipeline> getPipelines(
-      ReplicationConfig replicationConfig
-  );
-
-  List<Pipeline> getPipelines(
-      ReplicationConfig replicationConfig,
-      Pipeline.PipelineState state
-  );
-
-  List<Pipeline> getPipelines(
-      ReplicationConfig replicationConfig,
-      Pipeline.PipelineState state,
-      Collection<DatanodeDetails> excludeDns,
-      Collection<PipelineID> excludePipelines
-  );
-
-  NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
-      throws IOException;
-
-  int getNumberOfContainers(PipelineID pipelineID) throws IOException;
-
-  void removeContainerFromPipeline(PipelineID pipelineID,
-                                   ContainerID containerID) throws IOException;
-
-  void close() throws Exception;
-
-  // TODO remove legacy interfaces once we switch to Ratis based.
-
-  void addPipeline(Pipeline pipeline) throws IOException;
-
-  Pipeline removePipeline(PipelineID pipelineID) throws IOException;
-
-  void updatePipelineState(PipelineID id, Pipeline.PipelineState newState)
-      throws IOException;
-
-  Pipeline finalizePipeline(PipelineID pipelineId)
-      throws IOException;
-
-  Pipeline openPipeline(PipelineID pipelineId) throws IOException;
-
-  void activatePipeline(PipelineID pipelineID)
-      throws IOException;
-
-  void deactivatePipeline(PipelineID pipelineID)
-      throws IOException;
-
-  void reinitialize(Table<PipelineID, Pipeline> pipelineStore)
-      throws IOException;
-
-}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java
index 0b49ed8..415cf10 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/DefaultLeaderChoosePolicy.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipeline.StateManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
 
 import java.util.List;
 
@@ -31,7 +31,7 @@ import java.util.List;
 public class DefaultLeaderChoosePolicy extends LeaderChoosePolicy {
 
   public DefaultLeaderChoosePolicy(
-      NodeManager nodeManager, StateManager pipelineStateManager) {
+      NodeManager nodeManager, PipelineStateManager pipelineStateManager) {
     super(nodeManager, pipelineStateManager);
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java
index ada7702..04c155b 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicy.java
@@ -19,7 +19,7 @@ package org.apache.hadoop.hdds.scm.pipeline.leader.choose.algorithms;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipeline.StateManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
 
 import java.util.List;
 
@@ -29,10 +29,10 @@ import java.util.List;
 public abstract class LeaderChoosePolicy {
 
   private final NodeManager nodeManager;
-  private final StateManager pipelineStateManager;
+  private final PipelineStateManager pipelineStateManager;
 
   public LeaderChoosePolicy(
-      NodeManager nodeManager, StateManager pipelineStateManager) {
+      NodeManager nodeManager, PipelineStateManager pipelineStateManager) {
     this.nodeManager = nodeManager;
     this.pipelineStateManager = pipelineStateManager;
   }
@@ -49,7 +49,7 @@ public abstract class LeaderChoosePolicy {
     return nodeManager;
   }
 
-  protected StateManager getPipelineStateManager() {
+  protected PipelineStateManager getPipelineStateManager() {
     return pipelineStateManager;
   }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java
index 03d676e..af54d8f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/LeaderChoosePolicyFactory.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipeline.StateManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,7 +45,7 @@ public final class LeaderChoosePolicyFactory {
 
   public static LeaderChoosePolicy getPolicy(
       ConfigurationSource conf, final NodeManager nodeManager,
-      final StateManager pipelineStateManager) throws SCMException {
+      final PipelineStateManager pipelineStateManager) throws SCMException {
     final Class<? extends LeaderChoosePolicy> policyClass = conf
         .getClass(ScmConfigKeys.OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY,
             OZONE_SCM_PIPELINE_LEADER_CHOOSING_POLICY_DEFAULT,
@@ -53,12 +53,12 @@ public final class LeaderChoosePolicyFactory {
     Constructor<? extends LeaderChoosePolicy> constructor;
     try {
       constructor = policyClass.getDeclaredConstructor(NodeManager.class,
-          StateManager.class);
+          PipelineStateManager.class);
       LOG.info("Create leader choose policy of type {}",
           policyClass.getCanonicalName());
     } catch (NoSuchMethodException e) {
       String msg = "Failed to find constructor(NodeManager, " +
-          "PipelineStateManager) for class " +
+          "PipelineStateManagerImpl) for class " +
           policyClass.getCanonicalName();
       LOG.error(msg);
       throw new SCMException(msg,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java
index 8cb1df1..d4068b9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/MinLeaderCountChoosePolicy.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.scm.pipeline.StateManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,7 +41,7 @@ public class MinLeaderCountChoosePolicy extends LeaderChoosePolicy {
       LoggerFactory.getLogger(MinLeaderCountChoosePolicy.class);
 
   public MinLeaderCountChoosePolicy(
-      NodeManager nodeManager, StateManager pipelineStateManager) {
+      NodeManager nodeManager, PipelineStateManager pipelineStateManager) {
     super(nodeManager, pipelineStateManager);
   }
 
@@ -66,7 +66,7 @@ public class MinLeaderCountChoosePolicy extends LeaderChoosePolicy {
 
   private Map<DatanodeDetails, Integer> getSuggestedLeaderCount(
       List<DatanodeDetails> dns, NodeManager nodeManager,
-      StateManager pipelineStateManager) {
+      PipelineStateManager pipelineStateManager) {
     Map<DatanodeDetails, Integer> suggestedLeaderCount = new HashMap<>();
     for (DatanodeDetails dn : dns) {
       suggestedLeaderCount.put(dn, 0);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
index bf7af97..c6bb279 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestContainerManagerImpl.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
 import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
 import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.MockPipelineManager;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
 import org.apache.hadoop.hdds.utils.db.DBStore;
@@ -53,6 +54,7 @@ public class TestContainerManagerImpl {
   private ContainerManager containerManager;
   private SCMHAManager scmhaManager;
   private SequenceIdGenerator sequenceIdGen;
+  private NodeManager nodeManager;
 
   @Before
   public void setUp() throws Exception {
@@ -63,9 +65,11 @@ public class TestContainerManagerImpl {
     dbStore = DBStoreBuilder.createDBStore(
         conf, new SCMDBDefinition());
     scmhaManager = MockSCMHAManager.getInstance(true);
+    nodeManager = new MockNodeManager(true, 10);
     sequenceIdGen = new SequenceIdGenerator(
         conf, scmhaManager, SCMDBDefinition.SEQUENCE_ID.getTable(dbStore));
-    final PipelineManager pipelineManager = MockPipelineManager.getInstance();
+    final PipelineManager pipelineManager =
+        new MockPipelineManager(dbStore, scmhaManager, nodeManager);
     pipelineManager.createPipeline(new RatisReplicationConfig(
         ReplicationFactor.THREE));
     containerManager = new ContainerManagerImpl(conf,
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 2443345..f17ea5e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientManager;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
 import org.apache.hadoop.hdds.scm.events.SCMEvents;
@@ -86,6 +87,7 @@ public class TestContainerPlacement {
   private SequenceIdGenerator sequenceIdGen;
   private OzoneConfiguration conf;
   private PipelineManager pipelineManager;
+  private NodeManager nodeManager;
 
   @Rule
   public ExpectedException thrown = ExpectedException.none();
@@ -101,7 +103,9 @@ public class TestContainerPlacement {
     scmhaManager = MockSCMHAManager.getInstance(true);
     sequenceIdGen = new SequenceIdGenerator(
         conf, scmhaManager, SCMDBDefinition.SEQUENCE_ID.getTable(dbStore));
-    pipelineManager = MockPipelineManager.getInstance();
+    nodeManager = new MockNodeManager(true, 10);
+    pipelineManager = new MockPipelineManager(dbStore,
+        scmhaManager, nodeManager);
     pipelineManager.createPipeline(new RatisReplicationConfig(
         HddsProtos.ReplicationFactor.THREE));
   }
@@ -149,9 +153,9 @@ public class TestContainerPlacement {
         .thenReturn(maxLayoutVersion());
     Mockito.when(versionManager.getSoftwareLayoutVersion())
         .thenReturn(maxLayoutVersion());
-    SCMNodeManager nodeManager = new SCMNodeManager(config, storageConfig,
+    SCMNodeManager scmNodeManager = new SCMNodeManager(config, storageConfig,
         eventQueue, null, SCMContext.emptyContext(), versionManager);
-    return nodeManager;
+    return scmNodeManager;
   }
 
   ContainerManager createContainerManager()
@@ -183,29 +187,30 @@ public class TestContainerPlacement {
     conf.setClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
         SCMContainerPlacementCapacity.class, PlacementPolicy.class);
 
-    SCMNodeManager nodeManager = createNodeManager(conf);
+    SCMNodeManager scmNodeManager = createNodeManager(conf);
     containerManager = createContainerManager();
     List<DatanodeDetails> datanodes =
-        TestUtils.getListOfRegisteredDatanodeDetails(nodeManager, nodeCount);
+        TestUtils.getListOfRegisteredDatanodeDetails(scmNodeManager, nodeCount);
     XceiverClientManager xceiverClientManager = null;
-    LayoutVersionManager versionManager = nodeManager.getLayoutVersionManager();
+    LayoutVersionManager versionManager =
+        scmNodeManager.getLayoutVersionManager();
     LayoutVersionProto layoutInfo =
         toLayoutVersionProto(versionManager.getMetadataLayoutVersion(),
             versionManager.getSoftwareLayoutVersion());
     try {
       for (DatanodeDetails datanodeDetails : datanodes) {
-        nodeManager.processHeartbeat(datanodeDetails, layoutInfo);
+        scmNodeManager.processHeartbeat(datanodeDetails, layoutInfo);
       }
 
       //TODO: wait for heartbeat to be processed
       Thread.sleep(4 * 1000);
-      assertEquals(nodeCount, nodeManager.getNodeCount(null, HEALTHY));
+      assertEquals(nodeCount, scmNodeManager.getNodeCount(null, HEALTHY));
       assertEquals(capacity * nodeCount,
-          (long) nodeManager.getStats().getCapacity().get());
+          (long) scmNodeManager.getStats().getCapacity().get());
       assertEquals(used * nodeCount,
-          (long) nodeManager.getStats().getScmUsed().get());
+          (long) scmNodeManager.getStats().getScmUsed().get());
       assertEquals(remaining * nodeCount,
-          (long) nodeManager.getStats().getRemaining().get());
+          (long) scmNodeManager.getStats().getRemaining().get());
 
       xceiverClientManager= new XceiverClientManager(conf);
 
@@ -220,7 +225,7 @@ public class TestContainerPlacement {
               container.containerID()).size());
     } finally {
       IOUtils.closeQuietly(containerManager);
-      IOUtils.closeQuietly(nodeManager);
+      IOUtils.closeQuietly(scmNodeManager);
       if (xceiverClientManager != null) {
         xceiverClientManager.close();
       }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index fce38a1..b155f28 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -21,7 +21,12 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.utils.db.DBStore;
 import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.ozone.ClientVersions;
 
 import java.io.IOException;
 import java.util.Collection;
@@ -38,12 +43,14 @@ public final class MockPipelineManager implements PipelineManager {
 
   private PipelineStateManager stateManager;
 
-  public static PipelineManager getInstance() {
-    return new MockPipelineManager();
-  }
-
-  private MockPipelineManager() {
-    this.stateManager = new PipelineStateManager();
+  public MockPipelineManager(DBStore dbStore, SCMHAManager scmhaManager,
+                             NodeManager nodeManager) throws IOException {
+    stateManager = PipelineStateManagerImpl
+        .newBuilder().setNodeManager(nodeManager)
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
   }
 
   @Override
@@ -59,7 +66,9 @@ public final class MockPipelineManager implements PipelineManager {
         .setNodes(nodes)
         .setState(Pipeline.PipelineState.OPEN)
         .build();
-    stateManager.addPipeline(pipeline);
+
+    stateManager.addPipeline(pipeline.getProtobufMessage(
+        ClientVersions.CURRENT_VERSION));
     return pipeline;
   }
 
@@ -145,13 +154,11 @@ public final class MockPipelineManager implements PipelineManager {
   @Override
   public void openPipeline(final PipelineID pipelineId)
       throws IOException {
-    stateManager.openPipeline(pipelineId);
   }
 
   @Override
   public void closePipeline(final Pipeline pipeline, final boolean onTimeout)
       throws IOException {
-    stateManager.finalizePipeline(pipeline.getId());
   }
 
   @Override
@@ -188,13 +195,11 @@ public final class MockPipelineManager implements PipelineManager {
   @Override
   public void activatePipeline(final PipelineID pipelineID)
       throws IOException {
-
   }
 
   @Override
   public void deactivatePipeline(final PipelineID pipelineID)
       throws IOException {
-    stateManager.deactivatePipeline(pipelineID);
   }
 
   @Override
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
index 663c2c7..4760c0e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockRatisPipelineProvider.java
@@ -38,7 +38,7 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
   private boolean autoOpenPipeline;
 
   public MockRatisPipelineProvider(
-      NodeManager nodeManager, StateManager stateManager,
+      NodeManager nodeManager, PipelineStateManager stateManager,
       ConfigurationSource conf, EventPublisher eventPublisher,
       boolean autoOpen) {
     super(nodeManager, stateManager,
@@ -47,14 +47,14 @@ public class MockRatisPipelineProvider extends RatisPipelineProvider {
   }
 
   public MockRatisPipelineProvider(NodeManager nodeManager,
-      StateManager stateManager,
+      PipelineStateManager stateManager,
       ConfigurationSource conf) {
     super(nodeManager, stateManager,
         conf, new EventQueue(), SCMContext.emptyContext());
   }
 
   public MockRatisPipelineProvider(
-      NodeManager nodeManager, StateManager stateManager,
+      NodeManager nodeManager, PipelineStateManager stateManager,
       ConfigurationSource conf, EventPublisher eventPublisher) {
     super(nodeManager, stateManager,
         conf, eventPublisher, SCMContext.emptyContext());
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
index 54692cd..fec2f2e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineDatanodesIntersection.java
@@ -18,13 +18,26 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.ClientVersions;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -33,10 +46,12 @@ import org.junit.runners.Parameterized;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE;
@@ -53,11 +68,27 @@ public class TestPipelineDatanodesIntersection {
   private int nodeHeaviness;
   private OzoneConfiguration conf;
   private boolean end;
+  private File testDir;
+  private DBStore dbStore;
 
   @Before
-  public void initialize() {
-    conf = new OzoneConfiguration();
+  public void initialize() throws IOException {
+    conf = SCMTestUtils.getConf();
     end = false;
+    testDir = GenericTestUtils.getTestDir(
+        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    dbStore = DBStoreBuilder.createDBStore(
+        conf, new SCMDBDefinition());
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    if (dbStore != null) {
+      dbStore.close();
+    }
+
+    FileUtil.fullyDelete(testDir);
   }
 
   public TestPipelineDatanodesIntersection(int nodeCount, int nodeHeaviness) {
@@ -78,11 +109,20 @@ public class TestPipelineDatanodesIntersection {
   }
 
   @Test
-  public void testPipelineDatanodesIntersection() {
+  public void testPipelineDatanodesIntersection() throws IOException {
     NodeManager nodeManager= new MockNodeManager(true, nodeCount);
     conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, nodeHeaviness);
     conf.setBoolean(OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE, false);
-    StateManager stateManager = new PipelineStateManager();
+    SCMHAManager scmhaManager = MockSCMHAManager.getInstance(true);
+
+    PipelineStateManager stateManager = PipelineStateManagerImpl.newBuilder()
+        .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setNodeManager(nodeManager)
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
+
+
     PipelineProvider provider = new MockRatisPipelineProvider(nodeManager,
         stateManager, conf);
 
@@ -94,7 +134,9 @@ public class TestPipelineDatanodesIntersection {
       try {
         Pipeline pipeline = provider.create(new RatisReplicationConfig(
             ReplicationFactor.THREE));
-        stateManager.addPipeline(pipeline);
+        HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+            ClientVersions.CURRENT_VERSION);
+        stateManager.addPipeline(pipelineProto);
         nodeManager.addPipeline(pipeline);
         List<Pipeline> overlapPipelines = RatisPipelineUtils
             .checkPipelineContainSameDatanodes(stateManager, pipeline);
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index 9b216a9..efb391e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -25,6 +26,8 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.stream.Collectors;
 
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.conf.StorageUnit;
@@ -35,7 +38,11 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.net.NetConstants;
 import org.apache.hadoop.hdds.scm.net.NetworkTopology;
@@ -46,14 +53,18 @@ import org.apache.hadoop.hdds.scm.net.NodeSchema;
 import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
 import org.apache.hadoop.hdds.scm.node.states.Node2PipelineMap;
 
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.ClientVersions;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 
@@ -77,29 +88,49 @@ public class TestPipelinePlacementPolicy {
   private NetworkTopologyImpl cluster;
   private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
   private static final int PIPELINE_LOAD_LIMIT = 5;
+  private File testDir;
+  private DBStore dbStore;
+  private SCMHAManager scmhaManager;
 
   private List<DatanodeDetails> nodesWithOutRackAwareness = new ArrayList<>();
   private List<DatanodeDetails> nodesWithRackAwareness = new ArrayList<>();
 
-  static final Logger LOG =
-      LoggerFactory.getLogger(TestPipelinePlacementPolicy.class);
-
   @Before
   public void init() throws Exception {
     cluster = initTopology();
     // start with nodes with rack awareness.
     nodeManager = new MockNodeManager(cluster, getNodesWithRackAwareness(),
         false, PIPELINE_PLACEMENT_MAX_NODES_COUNT);
-    conf = new OzoneConfiguration();
+    conf = SCMTestUtils.getConf();
     conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, PIPELINE_LOAD_LIMIT);
     conf.setStorageSize(OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN,
         10, StorageUnit.MB);
     nodeManager.setNumPipelinePerDatanode(PIPELINE_LOAD_LIMIT);
-    stateManager = new PipelineStateManager();
+    testDir = GenericTestUtils.getTestDir(
+        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    dbStore = DBStoreBuilder.createDBStore(
+        conf, new SCMDBDefinition());
+    scmhaManager = MockSCMHAManager.getInstance(true);
+    stateManager = PipelineStateManagerImpl.newBuilder()
+        .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setNodeManager(nodeManager)
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
     placementPolicy = new PipelinePlacementPolicy(
         nodeManager, stateManager, conf);
   }
 
+  @After
+  public void cleanup() throws Exception {
+    if (dbStore != null) {
+      dbStore.close();
+    }
+
+    FileUtil.fullyDelete(testDir);
+  }
+
   private NetworkTopologyImpl initTopology() {
     NodeSchema[] schemas = new NodeSchema[]
         {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
@@ -152,7 +183,7 @@ public class TestPipelinePlacementPolicy {
   }
 
   @Test
-  public void testChooseNodeWithSingleNodeRack() throws SCMException {
+  public void testChooseNodeWithSingleNodeRack() throws IOException {
     // There is only one node on 3 racks altogether.
     List<DatanodeDetails> datanodes = new ArrayList<>();
     for (Node node : SINGLE_NODE_RACK) {
@@ -162,8 +193,16 @@ public class TestPipelinePlacementPolicy {
     }
     MockNodeManager localNodeManager = new MockNodeManager(initTopology(),
         datanodes, false, datanodes.size());
+
+    PipelineStateManager tempPipelineStateManager = PipelineStateManagerImpl
+        .newBuilder().setNodeManager(localNodeManager)
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
+
     PipelinePlacementPolicy localPlacementPolicy = new PipelinePlacementPolicy(
-        localNodeManager, new PipelineStateManager(), conf);
+        localNodeManager, tempPipelineStateManager, conf);
     int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
     List<DatanodeDetails> results = localPlacementPolicy.chooseDatanodes(
         new ArrayList<>(datanodes.size()),
@@ -181,7 +220,7 @@ public class TestPipelinePlacementPolicy {
   }
 
   @Test
-  public void testChooseNodeNotEnoughSpace() {
+  public void testChooseNodeNotEnoughSpace() throws IOException {
     // There is only one node on 3 racks altogether.
     List<DatanodeDetails> datanodes = new ArrayList<>();
     for (Node node : SINGLE_NODE_RACK) {
@@ -191,8 +230,16 @@ public class TestPipelinePlacementPolicy {
     }
     MockNodeManager localNodeManager = new MockNodeManager(initTopology(),
         datanodes, false, datanodes.size());
+
+    PipelineStateManager tempPipelineStateManager = PipelineStateManagerImpl
+        .newBuilder().setNodeManager(localNodeManager)
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
+
     PipelinePlacementPolicy localPlacementPolicy = new PipelinePlacementPolicy(
-        localNodeManager, new PipelineStateManager(), conf);
+        localNodeManager, tempPipelineStateManager, conf);
     int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
 
     String expectedMessageSubstring = "Unable to find enough nodes that meet " +
@@ -237,8 +284,10 @@ public class TestPipelinePlacementPolicy {
                 ReplicationFactor.THREE))
             .setNodes(nodes)
             .build();
+        HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+            ClientVersions.CURRENT_VERSION);
         nodeManager.addPipeline(pipeline);
-        stateManager.addPipeline(pipeline);
+        stateManager.addPipeline(pipelineProto);
       } catch (SCMException e) {
         throw e;
         //break;
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManagerImpl.java
similarity index 64%
rename from hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
rename to hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManagerImpl.java
index 49d969b..aa9c590 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateManagerImpl.java
@@ -18,34 +18,78 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.ClientVersions;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 
 /**
- * Test for PipelineStateManager.
+ * Test for PipelineStateManagerImpl.
  */
-public class TestPipelineStateManager {
+public class TestPipelineStateManagerImpl {
 
-  private StateManager stateManager;
+  private PipelineStateManager stateManager;
+  private File testDir;
+  private DBStore dbStore;
 
   @Before
   public void init() throws Exception {
-    stateManager = new PipelineStateManager();
+    final OzoneConfiguration conf = SCMTestUtils.getConf();
+    testDir = GenericTestUtils.getTestDir(
+        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    dbStore = DBStoreBuilder.createDBStore(
+        conf, new SCMDBDefinition());
+
+    SCMHAManager scmhaManager = MockSCMHAManager.getInstance(true);
+    NodeManager nodeManager = new MockNodeManager(true, 10);
+
+    stateManager = PipelineStateManagerImpl.newBuilder()
+        .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setNodeManager(nodeManager)
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    if (dbStore != null) {
+      dbStore.close();
+    }
+
+    FileUtil.fullyDelete(testDir);
   }
 
   private Pipeline createDummyPipeline(int numNodes) {
@@ -70,20 +114,24 @@ public class TestPipelineStateManager {
   @Test
   public void testAddAndGetPipeline() throws IOException {
     Pipeline pipeline = createDummyPipeline(0);
+    HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+        ClientVersions.CURRENT_VERSION);
     try {
-      stateManager.addPipeline(pipeline);
+      stateManager.addPipeline(pipelineProto);
       Assert.fail("Pipeline should not have been added");
-    } catch (IllegalArgumentException e) {
+    } catch (StateMachineException e) {
       // replication factor and number of nodes in the pipeline do not match
       Assert.assertTrue(e.getMessage().contains("do not match"));
     }
 
     // add a pipeline
     pipeline = createDummyPipeline(1);
-    stateManager.addPipeline(pipeline);
+    pipelineProto = pipeline.getProtobufMessage(
+        ClientVersions.CURRENT_VERSION);
+    stateManager.addPipeline(pipelineProto);
 
     try {
-      stateManager.addPipeline(pipeline);
+      stateManager.addPipeline(pipelineProto);
       Assert.fail("Pipeline should not have been added");
     } catch (IOException e) {
       // Can not add a pipeline twice
@@ -92,10 +140,11 @@ public class TestPipelineStateManager {
 
     // verify pipeline returned is same
     Pipeline pipeline1 = stateManager.getPipeline(pipeline.getId());
-    Assert.assertTrue(pipeline == pipeline1);
+    Assert.assertTrue(pipeline.getId().equals(pipeline1.getId()));
 
     // clean up
-    removePipeline(pipeline);
+    finalizePipeline(pipelineProto);
+    removePipeline(pipelineProto);
   }
 
   @Test
@@ -103,14 +152,14 @@ public class TestPipelineStateManager {
     // In start there should be no pipelines
     Assert.assertTrue(stateManager.getPipelines().isEmpty());
 
-    Set<Pipeline> pipelines = new HashSet<>();
-    Pipeline pipeline = createDummyPipeline(1);
+    Set<HddsProtos.Pipeline> pipelines = new HashSet<>();
+    HddsProtos.Pipeline pipeline = createDummyPipeline(1).getProtobufMessage(
+        ClientVersions.CURRENT_VERSION);
     stateManager.addPipeline(pipeline);
-    stateManager.openPipeline(pipeline.getId());
     pipelines.add(pipeline);
-    pipeline = createDummyPipeline(1);
+    pipeline = createDummyPipeline(1).getProtobufMessage(
+        ClientVersions.CURRENT_VERSION);
     stateManager.addPipeline(pipeline);
-    stateManager.openPipeline(pipeline.getId());
     pipelines.add(pipeline);
 
     Set<Pipeline> pipelines1 = new HashSet<>(stateManager
@@ -121,35 +170,37 @@ public class TestPipelineStateManager {
     Assert.assertEquals(pipelines1.size(), pipelines.size());
 
     // clean up
-    for (Pipeline pipeline1 : pipelines) {
+    for (HddsProtos.Pipeline pipeline1 : pipelines) {
+      finalizePipeline(pipeline1);
       removePipeline(pipeline1);
     }
   }
 
   @Test
   public void testGetPipelinesByTypeAndFactor() throws IOException {
-    Set<Pipeline> pipelines = new HashSet<>();
+    Set<HddsProtos.Pipeline> pipelines = new HashSet<>();
     for (HddsProtos.ReplicationType type : new ReplicationType[] {
         ReplicationType.RATIS, ReplicationType.STAND_ALONE}) {
       for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
           .values()) {
         for (int i = 0; i < 5; i++) {
           // 5 pipelines in allocated state for each type and factor
-          Pipeline pipeline =
-              createDummyPipeline(type, factor, factor.getNumber());
+          HddsProtos.Pipeline pipeline =
+              createDummyPipeline(type, factor, factor.getNumber())
+                  .getProtobufMessage(ClientVersions.CURRENT_VERSION);
           stateManager.addPipeline(pipeline);
           pipelines.add(pipeline);
 
           // 5 pipelines in open state for each type and factor
-          pipeline = createDummyPipeline(type, factor, factor.getNumber());
+          pipeline = createDummyPipeline(type, factor, factor.getNumber())
+              .getProtobufMessage(ClientVersions.CURRENT_VERSION);
           stateManager.addPipeline(pipeline);
-          stateManager.openPipeline(pipeline.getId());
           pipelines.add(pipeline);
 
           // 5 pipelines in closed state for each type and factor
-          pipeline = createDummyPipeline(type, factor, factor.getNumber());
+          pipeline = createDummyPipeline(type, factor, factor.getNumber())
+              .getProtobufMessage(ClientVersions.CURRENT_VERSION);
           stateManager.addPipeline(pipeline);
-          stateManager.finalizePipeline(pipeline.getId());
           pipelines.add(pipeline);
         }
       }
@@ -171,42 +222,47 @@ public class TestPipelineStateManager {
     }
 
     //clean up
-    for (Pipeline pipeline : pipelines) {
+    for (HddsProtos.Pipeline pipeline : pipelines) {
+      finalizePipeline(pipeline);
       removePipeline(pipeline);
     }
   }
 
   @Test
   public void testGetPipelinesByTypeFactorAndState() throws IOException {
-    Set<Pipeline> pipelines = new HashSet<>();
+    Set<HddsProtos.Pipeline> pipelines = new HashSet<>();
     for (HddsProtos.ReplicationType type : new ReplicationType[] {
         ReplicationType.RATIS, ReplicationType.STAND_ALONE}) {
       for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
           .values()) {
         for (int i = 0; i < 5; i++) {
           // 5 pipelines in allocated state for each type and factor
-          Pipeline pipeline =
-              createDummyPipeline(type, factor, factor.getNumber());
+          HddsProtos.Pipeline pipeline =
+              createDummyPipeline(type, factor, factor.getNumber())
+                  .getProtobufMessage(ClientVersions.CURRENT_VERSION);
           stateManager.addPipeline(pipeline);
           pipelines.add(pipeline);
 
           // 5 pipelines in open state for each type and factor
-          pipeline = createDummyPipeline(type, factor, factor.getNumber());
+          pipeline = createDummyPipeline(type, factor, factor.getNumber())
+              .getProtobufMessage(ClientVersions.CURRENT_VERSION);
           stateManager.addPipeline(pipeline);
-          stateManager.openPipeline(pipeline.getId());
+          openPipeline(pipeline);
           pipelines.add(pipeline);
 
           // 5 pipelines in dormant state for each type and factor
-          pipeline = createDummyPipeline(type, factor, factor.getNumber());
+          pipeline = createDummyPipeline(type, factor, factor.getNumber())
+              .getProtobufMessage(ClientVersions.CURRENT_VERSION);
           stateManager.addPipeline(pipeline);
-          stateManager.openPipeline(pipeline.getId());
-          stateManager.deactivatePipeline(pipeline.getId());
+          openPipeline(pipeline);
+          deactivatePipeline(pipeline);
           pipelines.add(pipeline);
 
           // 5 pipelines in closed state for each type and factor
-          pipeline = createDummyPipeline(type, factor, factor.getNumber());
+          pipeline = createDummyPipeline(type, factor, factor.getNumber())
+              .getProtobufMessage(ClientVersions.CURRENT_VERSION);
           stateManager.addPipeline(pipeline);
-          stateManager.finalizePipeline(pipeline.getId());
+          finalizePipeline(pipeline);
           pipelines.add(pipeline);
         }
       }
@@ -231,7 +287,8 @@ public class TestPipelineStateManager {
     }
 
     //clean up
-    for (Pipeline pipeline : pipelines) {
+    for (HddsProtos.Pipeline pipeline : pipelines) {
+      finalizePipeline(pipeline);
       removePipeline(pipeline);
     }
   }
@@ -240,13 +297,15 @@ public class TestPipelineStateManager {
   public void testAddAndGetContainer() throws IOException {
     long containerID = 0;
     Pipeline pipeline = createDummyPipeline(1);
-    stateManager.addPipeline(pipeline);
+    HddsProtos.Pipeline pipelineProto = pipeline
+        .getProtobufMessage(ClientVersions.CURRENT_VERSION);
+    stateManager.addPipeline(pipelineProto);
     pipeline = stateManager.getPipeline(pipeline.getId());
     stateManager.addContainerToPipeline(pipeline.getId(),
         ContainerID.valueOf(++containerID));
 
     // move pipeline to open state
-    stateManager.openPipeline(pipeline.getId());
+    openPipeline(pipelineProto);
     stateManager.addContainerToPipeline(pipeline.getId(),
         ContainerID.valueOf(++containerID));
     stateManager.addContainerToPipeline(pipeline.getId(),
@@ -257,7 +316,8 @@ public class TestPipelineStateManager {
         stateManager.getContainers(pipeline.getId());
     Assert.assertEquals(containerIDs.size(), containerID);
 
-    removePipeline(pipeline);
+    finalizePipeline(pipelineProto);
+    removePipeline(pipelineProto);
     try {
       stateManager.addContainerToPipeline(pipeline.getId(),
           ContainerID.valueOf(++containerID));
@@ -271,14 +331,16 @@ public class TestPipelineStateManager {
   @Test
   public void testRemovePipeline() throws IOException {
     Pipeline pipeline = createDummyPipeline(1);
-    stateManager.addPipeline(pipeline);
+    HddsProtos.Pipeline pipelineProto = pipeline
+        .getProtobufMessage(ClientVersions.CURRENT_VERSION);
+    stateManager.addPipeline(pipelineProto);
     // close the pipeline
-    stateManager.openPipeline(pipeline.getId());
+    openPipeline(pipelineProto);
     stateManager
         .addContainerToPipeline(pipeline.getId(), ContainerID.valueOf(1));
 
     try {
-      stateManager.removePipeline(pipeline.getId());
+      removePipeline(pipelineProto);
       Assert.fail("Pipeline should not have been removed");
     } catch (IOException e) {
       // can not remove a pipeline which already has containers
@@ -286,18 +348,20 @@ public class TestPipelineStateManager {
     }
 
     // close the pipeline
-    stateManager.finalizePipeline(pipeline.getId());
+    finalizePipeline(pipelineProto);
     // remove containers and then remove the pipeline
-    removePipeline(pipeline);
+    removePipeline(pipelineProto);
   }
 
   @Test
   public void testRemoveContainer() throws IOException {
     long containerID = 1;
     Pipeline pipeline = createDummyPipeline(1);
+    HddsProtos.Pipeline pipelineProto = pipeline
+        .getProtobufMessage(ClientVersions.CURRENT_VERSION);
     // create an open pipeline in stateMap
-    stateManager.addPipeline(pipeline);
-    stateManager.openPipeline(pipeline.getId());
+    stateManager.addPipeline(pipelineProto);
+    openPipeline(pipelineProto);
 
     stateManager.addContainerToPipeline(pipeline.getId(),
         ContainerID.valueOf(containerID));
@@ -314,7 +378,7 @@ public class TestPipelineStateManager {
     Assert.assertEquals(2, stateManager.getContainers(pipeline.getId()).size());
 
     // move pipeline to closing state
-    stateManager.finalizePipeline(pipeline.getId());
+    finalizePipeline(pipelineProto);
 
     stateManager.removeContainerFromPipeline(pipeline.getId(),
         ContainerID.valueOf(containerID));
@@ -323,57 +387,66 @@ public class TestPipelineStateManager {
     Assert.assertEquals(0, stateManager.getContainers(pipeline.getId()).size());
 
     // clean up
-    stateManager.removePipeline(pipeline.getId());
+    removePipeline(pipelineProto);
   }
 
   @Test
   public void testFinalizePipeline() throws IOException {
     Pipeline pipeline = createDummyPipeline(1);
-    stateManager.addPipeline(pipeline);
+    HddsProtos.Pipeline pipelineProto = pipeline
+        .getProtobufMessage(ClientVersions.CURRENT_VERSION);
+    stateManager.addPipeline(pipelineProto);
     // finalize on ALLOCATED pipeline
-    stateManager.finalizePipeline(pipeline.getId());
+    finalizePipeline(pipelineProto);
     Assert.assertEquals(Pipeline.PipelineState.CLOSED,
         stateManager.getPipeline(pipeline.getId()).getPipelineState());
     // clean up
-    removePipeline(pipeline);
+    removePipeline(pipelineProto);
 
     pipeline = createDummyPipeline(1);
-    stateManager.addPipeline(pipeline);
-    stateManager.openPipeline(pipeline.getId());
+    pipelineProto = pipeline
+        .getProtobufMessage(ClientVersions.CURRENT_VERSION);
+    stateManager.addPipeline(pipelineProto);
+    openPipeline(pipelineProto);
     // finalize on OPEN pipeline
-    stateManager.finalizePipeline(pipeline.getId());
+    finalizePipeline(pipelineProto);
     Assert.assertEquals(Pipeline.PipelineState.CLOSED,
         stateManager.getPipeline(pipeline.getId()).getPipelineState());
     // clean up
-    removePipeline(pipeline);
+    removePipeline(pipelineProto);
 
     pipeline = createDummyPipeline(1);
-    stateManager.addPipeline(pipeline);
-    stateManager.openPipeline(pipeline.getId());
-    stateManager.finalizePipeline(pipeline.getId());
+    pipelineProto = pipeline
+        .getProtobufMessage(ClientVersions.CURRENT_VERSION);
+    stateManager.addPipeline(pipelineProto);
+    openPipeline(pipelineProto);
+    finalizePipeline(pipelineProto);
     // finalize should work on already closed pipeline
-    stateManager.finalizePipeline(pipeline.getId());
+    finalizePipeline(pipelineProto);
     Assert.assertEquals(Pipeline.PipelineState.CLOSED,
         stateManager.getPipeline(pipeline.getId()).getPipelineState());
     // clean up
-    removePipeline(pipeline);
+    removePipeline(pipelineProto);
   }
 
   @Test
   public void testOpenPipeline() throws IOException {
     Pipeline pipeline = createDummyPipeline(1);
-    stateManager.addPipeline(pipeline);
+    HddsProtos.Pipeline pipelineProto = pipeline
+        .getProtobufMessage(ClientVersions.CURRENT_VERSION);
+    stateManager.addPipeline(pipelineProto);
     // open on ALLOCATED pipeline
-    stateManager.openPipeline(pipeline.getId());
+    openPipeline(pipelineProto);
     Assert.assertEquals(Pipeline.PipelineState.OPEN,
         stateManager.getPipeline(pipeline.getId()).getPipelineState());
 
-    stateManager.openPipeline(pipeline.getId());
+    openPipeline(pipelineProto);
     // open should work on already open pipeline
     Assert.assertEquals(Pipeline.PipelineState.OPEN,
         stateManager.getPipeline(pipeline.getId()).getPipelineState());
     // clean up
-    removePipeline(pipeline);
+    finalizePipeline(pipelineProto);
+    removePipeline(pipelineProto);
   }
 
   @Test
@@ -381,14 +454,16 @@ public class TestPipelineStateManager {
     Pipeline pipeline = createDummyPipeline(HddsProtos.ReplicationType.RATIS,
         HddsProtos.ReplicationFactor.THREE, 3);
     // pipeline in allocated state should not be reported
-    stateManager.addPipeline(pipeline);
+    HddsProtos.Pipeline pipelineProto = pipeline
+        .getProtobufMessage(ClientVersions.CURRENT_VERSION);
+    stateManager.addPipeline(pipelineProto);
     Assert.assertEquals(0, stateManager
         .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
             Pipeline.PipelineState.OPEN)
         .size());
 
     // pipeline in open state should be reported
-    stateManager.openPipeline(pipeline.getId());
+    openPipeline(pipelineProto);
     Assert.assertEquals(1, stateManager
         .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
             Pipeline.PipelineState.OPEN)
@@ -399,27 +474,47 @@ public class TestPipelineStateManager {
     pipeline2 = Pipeline.newBuilder(pipeline2)
         .setState(Pipeline.PipelineState.OPEN)
         .build();
+    HddsProtos.Pipeline pipelineProto2 = pipeline2
+        .getProtobufMessage(ClientVersions.CURRENT_VERSION);
     // pipeline in open state should be reported
-    stateManager.addPipeline(pipeline2);
+    stateManager.addPipeline(pipelineProto2);
     Assert.assertEquals(2, stateManager
         .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
             Pipeline.PipelineState.OPEN)
         .size());
 
     // pipeline in closed state should not be reported
-    stateManager.finalizePipeline(pipeline2.getId());
+    finalizePipeline(pipelineProto2);
     Assert.assertEquals(1, stateManager
         .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
             Pipeline.PipelineState.OPEN)
         .size());
 
     // clean up
-    removePipeline(pipeline);
-    removePipeline(pipeline2);
+    finalizePipeline(pipelineProto);
+    removePipeline(pipelineProto);
+    finalizePipeline(pipelineProto2);
+    removePipeline(pipelineProto2);
   }
 
-  private void removePipeline(Pipeline pipeline) throws IOException {
-    stateManager.finalizePipeline(pipeline.getId());
+  private void removePipeline(HddsProtos.Pipeline pipeline) throws IOException {
     stateManager.removePipeline(pipeline.getId());
   }
+
+  private void openPipeline(HddsProtos.Pipeline pipeline) throws IOException {
+    stateManager.updatePipelineState(pipeline.getId(),
+        HddsProtos.PipelineState.PIPELINE_OPEN);
+  }
+
+  private void finalizePipeline(HddsProtos.Pipeline pipeline)
+      throws IOException {
+    stateManager.updatePipelineState(pipeline.getId(),
+        HddsProtos.PipelineState.PIPELINE_CLOSED);
+  }
+
+  private void deactivatePipeline(HddsProtos.Pipeline pipeline)
+      throws IOException {
+    stateManager.updatePipelineState(pipeline.getId(),
+        HddsProtos.PipelineState.PIPELINE_DORMANT);
+  }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 7eb8977..d1f383c 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -27,17 +29,27 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.ClientVersions;
+import org.apache.ozone.test.GenericTestUtils;
 import org.junit.Assert;
 import org.junit.Assume;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 import static org.apache.commons.collections.CollectionUtils.intersection;
@@ -58,6 +70,8 @@ public class TestRatisPipelineProvider {
   private MockNodeManager nodeManager;
   private RatisPipelineProvider provider;
   private PipelineStateManager stateManager;
+  private File testDir;
+  private DBStore dbStore;
 
   public void init(int maxPipelinePerNode) throws Exception {
     init(maxPipelinePerNode, new OzoneConfiguration());
@@ -65,15 +79,34 @@ public class TestRatisPipelineProvider {
 
   public void init(int maxPipelinePerNode, OzoneConfiguration conf)
       throws Exception {
+    testDir = GenericTestUtils.getTestDir(
+        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    dbStore = DBStoreBuilder.createDBStore(
+        conf, new SCMDBDefinition());
     nodeManager = new MockNodeManager(true, 10);
     nodeManager.setNumPipelinePerDatanode(maxPipelinePerNode);
+    SCMHAManager scmhaManager = MockSCMHAManager.getInstance(true);
     conf.setInt(ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT,
         maxPipelinePerNode);
-    stateManager = new PipelineStateManager();
+    stateManager = PipelineStateManagerImpl.newBuilder()
+        .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setNodeManager(nodeManager)
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
     provider = new MockRatisPipelineProvider(nodeManager,
         stateManager, conf);
   }
 
+  private void cleanup() throws Exception {
+    if (dbStore != null) {
+      dbStore.close();
+    }
+
+    FileUtil.fullyDelete(testDir);
+  }
+
   private static void assertPipelineProperties(
       Pipeline pipeline, HddsProtos.ReplicationFactor expectedFactor,
       HddsProtos.ReplicationType expectedReplicationType,
@@ -90,10 +123,14 @@ public class TestRatisPipelineProvider {
     Pipeline pipeline = provider.create(new RatisReplicationConfig(factor));
     assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
         Pipeline.PipelineState.ALLOCATED);
-    stateManager.addPipeline(pipeline);
+    HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+        ClientVersions.CURRENT_VERSION);
+    stateManager.addPipeline(pipelineProto);
     nodeManager.addPipeline(pipeline);
 
     Pipeline pipeline1 = provider.create(new RatisReplicationConfig(factor));
+    HddsProtos.Pipeline pipelineProto1 = pipeline1.getProtobufMessage(
+        ClientVersions.CURRENT_VERSION);
     assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE,
         Pipeline.PipelineState.ALLOCATED);
     // New pipeline should not overlap with the previous created pipeline
@@ -103,7 +140,7 @@ public class TestRatisPipelineProvider {
     if (pipeline.getReplicationConfig().getRequiredNodes() == 3) {
       assertNotEquals(pipeline.getNodeSet(), pipeline1.getNodeSet());
     }
-    stateManager.addPipeline(pipeline1);
+    stateManager.addPipeline(pipelineProto1);
     nodeManager.addPipeline(pipeline1);
   }
 
@@ -111,12 +148,14 @@ public class TestRatisPipelineProvider {
   public void testCreatePipelineWithFactorThree() throws Exception {
     init(1);
     createPipelineAndAssertions(HddsProtos.ReplicationFactor.THREE);
+    cleanup();
   }
 
   @Test
   public void testCreatePipelineWithFactorOne() throws Exception {
     init(1);
     createPipelineAndAssertions(HddsProtos.ReplicationFactor.ONE);
+    cleanup();
   }
 
   private List<DatanodeDetails> createListOfNodes(int nodeCount) {
@@ -134,16 +173,21 @@ public class TestRatisPipelineProvider {
     Pipeline pipeline = provider.create(new RatisReplicationConfig(factor));
     assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
         Pipeline.PipelineState.ALLOCATED);
-    stateManager.addPipeline(pipeline);
+    HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+        ClientVersions.CURRENT_VERSION);
+    stateManager.addPipeline(pipelineProto);
 
     factor = HddsProtos.ReplicationFactor.ONE;
     Pipeline pipeline1 = provider.create(new RatisReplicationConfig(factor));
     assertPipelineProperties(pipeline1, factor, REPLICATION_TYPE,
         Pipeline.PipelineState.ALLOCATED);
-    stateManager.addPipeline(pipeline1);
+    HddsProtos.Pipeline pipelineProto1 = pipeline1.getProtobufMessage(
+        ClientVersions.CURRENT_VERSION);
+    stateManager.addPipeline(pipelineProto1);
     // With enough pipeline quote on datanodes, they should not share
     // the same set of datanodes.
     assertNotEquals(pipeline.getNodeSet(), pipeline1.getNodeSet());
+    cleanup();
   }
 
   @Test
@@ -161,6 +205,7 @@ public class TestRatisPipelineProvider {
         createListOfNodes(factor.getNumber()));
     assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
         Pipeline.PipelineState.OPEN);
+    cleanup();
   }
 
   @Test
@@ -177,6 +222,7 @@ public class TestRatisPipelineProvider {
         new RatisReplicationConfig(ReplicationFactor.THREE), healthyNodes);
 
     Assert.assertEquals(pipeline1.getNodeSet(), pipeline2.getNodeSet());
+    cleanup();
   }
 
   @Test
@@ -211,8 +257,10 @@ public class TestRatisPipelineProvider {
         new RatisReplicationConfig(factor));
     assertPipelineProperties(pipeline, factor, REPLICATION_TYPE,
         Pipeline.PipelineState.ALLOCATED);
+    HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+        ClientVersions.CURRENT_VERSION);
     nodeManager.addPipeline(pipeline);
-    stateManager.addPipeline(pipeline);
+    stateManager.addPipeline(pipelineProto);
 
     List<DatanodeDetails> nodes = pipeline.getNodes();
 
@@ -223,6 +271,7 @@ public class TestRatisPipelineProvider {
     assertTrue(
         "at least 1 node should have been from members of closed pipelines",
         nodes.stream().anyMatch(membersOfClosedPipelines::contains));
+    cleanup();
   }
 
   @Test
@@ -257,6 +306,7 @@ public class TestRatisPipelineProvider {
         Assert.assertTrue(ex.getMessage().contains(expectedErrorSubstring));
       }
     }
+    cleanup();
   }
 
   private void addPipeline(
@@ -269,8 +319,10 @@ public class TestRatisPipelineProvider {
         .setState(open)
         .setId(PipelineID.randomId())
         .build();
+    HddsProtos.Pipeline pipelineProto = openPipeline.getProtobufMessage(
+        ClientVersions.CURRENT_VERSION);
 
-    stateManager.addPipeline(openPipeline);
+    stateManager.addPipeline(pipelineProto);
     nodeManager.addPipeline(openPipeline);
   }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
deleted file mode 100644
index fb13d05..0000000
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES;
-
-import java.io.IOException;
-import java.math.BigInteger;
-import java.security.cert.X509Certificate;
-
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
-import org.apache.hadoop.hdds.security.x509.certificate.CertInfo;
-import org.apache.hadoop.hdds.utils.TransactionInfo;
-import org.apache.hadoop.hdds.scm.metadata.PipelineCodec;
-import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
-import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
-import org.apache.hadoop.hdds.security.x509.crl.CRLInfo;
-import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
-import org.apache.hadoop.hdds.utils.db.Codec;
-import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
-import org.apache.hadoop.hdds.utils.db.DBDefinition;
-import org.apache.hadoop.hdds.utils.db.DBStore;
-import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-
-/**
- * Test SCM Metadata Store that has ONLY the pipeline table whose key uses the
- * old codec format.
- */
-public class TestSCMStoreImplWithOldPipelineIDKeyFormat
-    implements SCMMetadataStore {
-
-  private DBStore store;
-  private final OzoneConfiguration configuration;
-  private Table<PipelineID, Pipeline> pipelineTable;
-
-  public TestSCMStoreImplWithOldPipelineIDKeyFormat(
-      OzoneConfiguration config) throws IOException {
-    this.configuration = config;
-    start(configuration);
-  }
-
-  @Override
-  public void start(OzoneConfiguration config)
-      throws IOException {
-    if (this.store == null) {
-      this.store = DBStoreBuilder.createDBStore(config,
-          new SCMDBTestDefinition());
-      pipelineTable = PIPELINES.getTable(store);
-    }
-  }
-
-  @Override
-  public void stop() throws Exception {
-    if (store != null) {
-      store.close();
-      store = null;
-    }
-  }
-
-  @Override
-  public DBStore getStore() {
-    return null;
-  }
-
-  @Override
-  public Table<Long, DeletedBlocksTransaction> getDeletedBlocksTXTable() {
-    return null;
-  }
-
-  @Override
-  public Table<BigInteger, X509Certificate> getValidCertsTable() {
-    return null;
-  }
-
-  @Override
-  public Table<BigInteger, X509Certificate> getValidSCMCertsTable() {
-    return null;
-  }
-
-  @Override
-  public Table<BigInteger, X509Certificate> getRevokedCertsTable() {
-    return null;
-  }
-
-  @Override
-  public Table<BigInteger, CertInfo> getRevokedCertsV2Table() {
-    return null;
-  }
-
-  @Override
-  public Table<Long, CRLInfo> getCRLInfoTable() {
-    return null;
-  }
-
-  @Override
-  public Table<String, Long> getCRLSequenceIdTable() {
-    return null;
-  }
-
-  @Override
-  public Table<String, Long> getSequenceIdTable() {
-    return null;
-  }
-
-  @Override
-  public TableIterator getAllCerts(CertificateStore.CertType certType) {
-    return null;
-  }
-
-  @Override
-  public Table<PipelineID, Pipeline> getPipelineTable() {
-    return pipelineTable;
-  }
-
-  @Override
-  public Table<String, TransactionInfo> getTransactionInfoTable() {
-    return null;
-  }
-
-  @Override
-  public BatchOperationHandler getBatchHandler() {
-    return null;
-  }
-
-  @Override
-  public Table<ContainerID, ContainerInfo> getContainerTable() {
-    return null;
-  }
-
-  @Override
-  public Table<ContainerID, MoveDataNodePair> getMoveTable() {
-    return null;
-  }
-
-  /**
-   * Test SCM DB Definition for the above class.
-   */
-  public static class SCMDBTestDefinition implements DBDefinition {
-
-    public static final DBColumnFamilyDefinition<PipelineID, Pipeline>
-        PIPELINES =
-        new DBColumnFamilyDefinition<>(
-            "pipelines",
-            PipelineID.class,
-            new OldPipelineIDCodec(),
-            Pipeline.class,
-            new PipelineCodec());
-
-    @Override
-    public String getName() {
-      return "scm.db";
-    }
-
-    @Override
-    public String getLocationConfigKey() {
-      return ScmConfigKeys.OZONE_SCM_DB_DIRS;
-    }
-
-    @Override
-    public DBColumnFamilyDefinition[] getColumnFamilies() {
-      return new DBColumnFamilyDefinition[] {PIPELINES};
-    }
-  }
-
-  /**
-   * Old Pipeline ID codec that relies on protobuf serialization.
-   */
-  public static class OldPipelineIDCodec implements Codec<PipelineID> {
-    @Override
-    public byte[] toPersistedFormat(PipelineID object) throws IOException {
-      return object.getProtobuf().toByteArray();
-    }
-
-    @Override
-    public PipelineID fromPersistedFormat(byte[] rawData) throws IOException {
-      return null;
-    }
-
-    @Override
-    public PipelineID copyObject(PipelineID object) {
-      throw new UnsupportedOperationException();
-    }
-  }
-
-}
-
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
index f21fc00..8fa1509 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
@@ -18,19 +18,34 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.container.TestContainerManagerImpl;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.ozone.ClientVersions;
+import org.apache.hadoop.ozone.container.common.SCMTestUtils;
+import org.apache.ozone.test.GenericTestUtils;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.UUID;
 
 /**
  * Test for SimplePipelineProvider.
@@ -40,20 +55,45 @@ public class TestSimplePipelineProvider {
   private NodeManager nodeManager;
   private PipelineProvider provider;
   private PipelineStateManager stateManager;
+  private File testDir;
+  private DBStore dbStore;
 
   @Before
   public void init() throws Exception {
     nodeManager = new MockNodeManager(true, 10);
-    stateManager = new PipelineStateManager();
+    final OzoneConfiguration conf = SCMTestUtils.getConf();
+    testDir = GenericTestUtils.getTestDir(
+        TestContainerManagerImpl.class.getSimpleName() + UUID.randomUUID());
+    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
+    dbStore = DBStoreBuilder.createDBStore(
+        conf, new SCMDBDefinition());
+    SCMHAManager scmhaManager = MockSCMHAManager.getInstance(true);
+    stateManager = PipelineStateManagerImpl.newBuilder()
+        .setPipelineStore(SCMDBDefinition.PIPELINES.getTable(dbStore))
+        .setRatisServer(scmhaManager.getRatisServer())
+        .setNodeManager(nodeManager)
+        .setSCMDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+        .build();
     provider = new SimplePipelineProvider(nodeManager, stateManager);
   }
 
+  @After
+  public void cleanup() throws Exception {
+    if (dbStore != null) {
+      dbStore.close();
+    }
+
+    FileUtil.fullyDelete(testDir);
+  }
+
   @Test
   public void testCreatePipelineWithFactor() throws IOException {
     HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
     Pipeline pipeline =
         provider.create(new StandaloneReplicationConfig(factor));
-    stateManager.addPipeline(pipeline);
+    HddsProtos.Pipeline pipelineProto = pipeline.getProtobufMessage(
+        ClientVersions.CURRENT_VERSION);
+    stateManager.addPipeline(pipelineProto);
     Assert.assertEquals(pipeline.getType(),
         HddsProtos.ReplicationType.STAND_ALONE);
     Assert.assertEquals(pipeline.getReplicationConfig().getRequiredNodes(),
@@ -65,7 +105,9 @@ public class TestSimplePipelineProvider {
     factor = HddsProtos.ReplicationFactor.ONE;
     Pipeline pipeline1 =
         provider.create(new StandaloneReplicationConfig(factor));
-    stateManager.addPipeline(pipeline1);
+    HddsProtos.Pipeline pipelineProto1 = pipeline1.getProtobufMessage(
+        ClientVersions.CURRENT_VERSION);
+    stateManager.addPipeline(pipelineProto1);
     Assert.assertEquals(pipeline1.getType(),
         HddsProtos.ReplicationType.STAND_ALONE);
     Assert.assertEquals(
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java
index 8974f98..0cade56 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/leader/choose/algorithms/TestLeaderChoosePolicy.java
@@ -21,7 +21,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManagerImpl;
 import org.apache.hadoop.hdds.scm.pipeline.RatisPipelineProvider;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.junit.Assert;
@@ -46,7 +46,7 @@ public class TestLeaderChoosePolicy {
   public void testDefaultPolicy() {
     RatisPipelineProvider ratisPipelineProvider = new RatisPipelineProvider(
         mock(NodeManager.class),
-        mock(PipelineStateManager.class),
+        mock(PipelineStateManagerImpl.class),
         conf,
         mock(EventPublisher.class),
         SCMContext.emptyContext());
@@ -63,7 +63,7 @@ public class TestLeaderChoosePolicy {
             ".HelloWorld");
     new RatisPipelineProvider(
         mock(NodeManager.class),
-        mock(PipelineStateManager.class),
+        mock(PipelineStateManagerImpl.class),
         conf,
         mock(EventPublisher.class),
         SCMContext.emptyContext());
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
index 533608d..0094320 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
@@ -31,8 +31,8 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineFactory;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManagerV2Impl;
-import org.apache.hadoop.hdds.scm.pipeline.StateManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManagerImpl;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.hdds.utils.db.Table;
 import org.apache.hadoop.ozone.ClientVersions;
@@ -55,7 +55,7 @@ public final class ReconPipelineManager extends PipelineManagerImpl {
   private ReconPipelineManager(ConfigurationSource conf,
                                SCMHAManager scmhaManager,
                                NodeManager nodeManager,
-                               StateManager pipelineStateManager,
+                               PipelineStateManager pipelineStateManager,
                                PipelineFactory pipelineFactory,
                                EventPublisher eventPublisher,
                                SCMContext scmContext) {
@@ -71,8 +71,8 @@ public final class ReconPipelineManager extends PipelineManagerImpl {
       SCMHAManager scmhaManager,
       SCMContext scmContext) throws IOException {
 
-    // Create PipelineStateManager
-    StateManager stateManager = PipelineStateManagerV2Impl
+    // Create PipelineStateManagerImpl
+    PipelineStateManager stateManager = PipelineStateManagerImpl
         .newBuilder()
         .setPipelineStore(pipelineStore)
         .setNodeManager(nodeManager)

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org