You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2021/06/25 19:01:33 UTC

[ozone] branch HDDS-3816-ec updated: HDDS-4892. EC: Implement basic EC pipeline provider (#2353)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
     new 206c935  HDDS-4892. EC: Implement basic EC pipeline provider (#2353)
206c935 is described below

commit 206c9358a5687e3c33f1d31e1c00f3e2d6130d1d
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Fri Jun 25 20:01:17 2021 +0100

    HDDS-4892. EC: Implement basic EC pipeline provider (#2353)
---
 .../apache/hadoop/hdds/scm/pipeline/Pipeline.java  |   6 +
 .../hadoop/hdds/scm/pipeline/MockPipeline.java     |  16 +-
 .../hadoop/hdds/scm/pipeline/TestPipeline.java     |   6 +
 .../algorithms/SCMContainerPlacementMetrics.java   |  10 +
 .../hdds/scm/pipeline/ECPipelineProvider.java      |  88 +++++
 .../hadoop/hdds/scm/pipeline/PipelineFactory.java  |  16 +
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |  11 +
 .../hdds/scm/pipeline/PipelineManagerV2Impl.java   |  13 +
 .../hdds/scm/pipeline/PipelineStateManager.java    |  13 +
 .../scm/pipeline/PipelineStateManagerV2Impl.java   |  20 ++
 .../hadoop/hdds/scm/pipeline/PipelineStateMap.java |  30 ++
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  11 +
 .../hadoop/hdds/scm/pipeline/StateManager.java     |  12 +
 .../scm/pipeline/WritableContainerFactory.java     |   8 +
 .../scm/pipeline/WritableECContainerProvider.java  | 229 +++++++++++++
 .../TestSCMContainerPlacementRackAware.java        |   6 +
 .../hdds/scm/pipeline/MockPipelineManager.java     |  17 +-
 .../hdds/scm/pipeline/TestECPipelineProvider.java  |  84 +++++
 .../hdds/scm/pipeline/TestPipelineManagerImpl.java |   6 +
 .../hdds/scm/pipeline/TestPipelineStateMap.java    |  95 ++++++
 .../pipeline/TestWritableECContainerProvider.java  | 372 +++++++++++++++++++++
 21 files changed, 1059 insertions(+), 10 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 33bb70a..638180c 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -263,6 +263,12 @@ public final class Pipeline {
   }
 
   public boolean isHealthy() {
+    // EC pipelines are not reported by the DN and do not have a leader. If a
+    // node goes stale or dead, EC pipelines will by closed like RATIS pipelines
+    // but at the current time there are not other health metrics for EC.
+    if (replicationConfig.getReplicationType() == ReplicationType.EC) {
+      return true;
+    }
     for (Long reportedTime : nodeStatus.values()) {
       if (reportedTime < 0) {
         return false;
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
index 259b57c..b2225d2 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
@@ -91,14 +91,15 @@ public final class MockPipeline {
   }
 
   public static Pipeline createEcPipeline() {
+    return createEcPipeline(new ECReplicationConfig(3, 2));
+  }
 
-    List<DatanodeDetails> nodes = new ArrayList<>();
-    nodes.add(MockDatanodeDetails.randomDatanodeDetails());
-    nodes.add(MockDatanodeDetails.randomDatanodeDetails());
-    nodes.add(MockDatanodeDetails.randomDatanodeDetails());
-    nodes.add(MockDatanodeDetails.randomDatanodeDetails());
-    nodes.add(MockDatanodeDetails.randomDatanodeDetails());
+  public static Pipeline createEcPipeline(ECReplicationConfig repConfig) {
 
+    List<DatanodeDetails> nodes = new ArrayList<>();
+    for (int i=0; i<repConfig.getRequiredNodes(); i++) {
+      nodes.add(MockDatanodeDetails.randomDatanodeDetails());
+    }
     Map<DatanodeDetails, Integer> nodeIndexes = new HashMap<>();
 
     int index = nodes.size() - 1;
@@ -110,8 +111,7 @@ public final class MockPipeline {
     return Pipeline.newBuilder()
         .setState(Pipeline.PipelineState.OPEN)
         .setId(PipelineID.randomId())
-        .setReplicationConfig(
-            new ECReplicationConfig(3, 2))
+        .setReplicationConfig(repConfig)
         .setNodes(nodes)
         .setReplicaIndexes(nodeIndexes)
         .build();
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java
index 0b9e9fe..cf7dd4d 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java
@@ -81,4 +81,10 @@ public class TestPipeline {
           reloadedPipeline.getReplicaIndex(dn));
     }
   }
+
+  @Test
+  public void testECPipelineIsAlwaysHealthy() throws IOException {
+    Pipeline pipeline = MockPipeline.createEcPipeline();
+    Assert.assertTrue(pipeline.isHealthy());
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementMetrics.java
index 1ca68bd..b4a1436 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementMetrics.java
@@ -53,8 +53,18 @@ public class SCMContainerPlacementMetrics implements MetricsSource {
   public SCMContainerPlacementMetrics() {
   }
 
+  /**
+   * Return the existing instance of SCMContainerPlacementMetrics if it is
+   * registered in the metrics System, otherwise create a new instance, register
+   * it and return it.
+   * @return A new or existing SCMContainerPlacementMetrics object
+   */
   public static SCMContainerPlacementMetrics create() {
     MetricsSystem ms = DefaultMetricsSystem.instance();
+    MetricsSource existingSource = ms.getSource(SOURCE_NAME);
+    if (existingSource != null) {
+      return (SCMContainerPlacementMetrics)existingSource;
+    }
     registry = new MetricsRegistry(RECORD_INFO);
     return ms.register(SOURCE_NAME, "SCM Container Placement Metrics",
         new SCMContainerPlacementMetrics());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
new file mode 100644
index 0000000..5ce442f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class to create pipelines for EC containers.
+ */
+public class ECPipelineProvider extends PipelineProvider<ECReplicationConfig> {
+
+  // TODO - EC Placement Policy. Standard Network Aware topology will not work
+  //        for EC as it stands. We may want an "as many racks as possible"
+  //        policy. HDDS-5326.
+
+  private final ConfigurationSource conf;
+  private final PlacementPolicy placementPolicy;
+
+  public ECPipelineProvider(NodeManager nodeManager,
+                            StateManager stateManager,
+                            ConfigurationSource conf,
+                            PlacementPolicy placementPolicy) {
+    super(nodeManager, stateManager);
+    this.conf = conf;
+    this.placementPolicy = placementPolicy;
+  }
+
+  @Override
+  protected Pipeline create(ECReplicationConfig replicationConfig)
+      throws IOException {
+    List<DatanodeDetails> dns = placementPolicy.chooseDatanodes(null,
+        null, replicationConfig.getRequiredNodes(), 0);
+    return create(replicationConfig, dns);
+  }
+
+  @Override
+  protected Pipeline create(ECReplicationConfig replicationConfig,
+      List<DatanodeDetails> nodes) {
+
+    Map<DatanodeDetails, Integer> dnIndexes = new HashMap<>();
+    int ecIndex = 1;
+    for (DatanodeDetails dn : nodes) {
+      dnIndexes.put(dn, ecIndex);
+      ecIndex++;
+    }
+
+    return Pipeline.newBuilder()
+        .setId(PipelineID.randomId())
+        .setState(Pipeline.PipelineState.ALLOCATED)
+        .setReplicationConfig(replicationConfig)
+        .setNodes(nodes)
+        .setReplicaIndexes(dnIndexes)
+        .build();
+  }
+
+  @Override
+  protected void close(Pipeline pipeline) throws IOException {
+  }
+
+  @Override
+  protected void shutdown() {
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 0fbabee..b70b1a4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -23,6 +23,10 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -50,6 +54,18 @@ public class PipelineFactory {
         new RatisPipelineProvider(nodeManager,
             stateManager, conf,
             eventPublisher, scmContext));
+    PlacementPolicy placementPolicy;
+    try {
+      placementPolicy = ContainerPlacementPolicyFactory.getPolicy(conf,
+          nodeManager, nodeManager.getClusterNetworkTopologyMap(), true,
+          SCMContainerPlacementMetrics.create());
+    } catch (SCMException e) {
+      throw new RuntimeException("Unable to get the container placement policy",
+          e);
+    }
+    providers.put(ReplicationType.EC,
+        new ECPipelineProvider(nodeManager, stateManager, conf,
+            placementPolicy));
   }
 
   protected PipelineFactory() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index ad8faca..6a35249 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -65,6 +65,17 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
       Collection<PipelineID> excludePipelines
   );
 
+  /**
+   * Returns the count of pipelines meeting the given ReplicationConfig and
+   * state.
+   * @param replicationConfig The ReplicationConfig of the pipelines to count
+   * @param state The current state of the pipelines to count
+   * @return The count of pipelines meeting the above criteria
+   */
+  int getPipelineCount(
+      ReplicationConfig replicationConfig, Pipeline.PipelineState state
+  );
+
   void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID)
       throws IOException;
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index b7d8864..c383093 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -234,6 +234,19 @@ public class PipelineManagerV2Impl implements PipelineManager {
   }
 
   @Override
+  /**
+   * Returns the count of pipelines meeting the given ReplicationConfig and
+   * state.
+   * @param replicationConfig The ReplicationConfig of the pipelines to count
+   * @param state The current state of the pipelines to count
+   * @return The count of pipelines meeting the above criteria
+   */
+  public int getPipelineCount(ReplicationConfig config,
+                                     Pipeline.PipelineState state) {
+    return stateManager.getPipelineCount(config, state);
+  }
+
+  @Override
   public void addContainerToPipeline(
       PipelineID pipelineID, ContainerID containerID) throws IOException {
     // should not lock here, since no ratis operation happens.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index c3df3d2..83e4ae5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -98,6 +98,19 @@ public class PipelineStateManager implements StateManager {
   }
 
   @Override
+  /**
+   * Returns the count of pipelines meeting the given ReplicationConfig and
+   * state.
+   * @param replicationConfig The ReplicationConfig of the pipelines to count
+   * @param state The current state of the pipelines to count
+   * @return The count of pipelines meeting the above criteria
+   */
+  public int getPipelineCount(ReplicationConfig replicationConfig,
+      PipelineState state) {
+    return pipelineStateMap.getPipelineCount(replicationConfig, state);
+  }
+
+  @Override
   public NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
       throws IOException {
     return pipelineStateMap.getContainers(pipelineID);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
index 7f52d93..2c1ef65 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
@@ -176,6 +176,26 @@ public class PipelineStateManagerV2Impl implements StateManager {
     }
   }
 
+
+  /**
+   * Returns the count of pipelines meeting the given ReplicationConfig and
+   * state.
+   * @param replicationConfig The ReplicationConfig of the pipelines to count
+   * @param state The current state of the pipelines to count
+   * @return The count of pipelines meeting the above criteria
+   */
+  @Override
+  public int getPipelineCount(
+      ReplicationConfig replicationConfig,
+      Pipeline.PipelineState state) {
+    lock.readLock().lock();
+    try {
+      return pipelineStateMap.getPipelineCount(replicationConfig, state);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
   @Override
   public NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
       throws IOException {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index ac95b07..6d42777 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -184,6 +184,36 @@ class PipelineStateMap {
   }
 
   /**
+   * Get a count of pipelines with the given replicationConfig and state.
+   * This method is most efficient when getting a count for OPEN pipeline
+   * as the result can be obtained directly from the cached open list.
+   *
+   * @param replicationConfig - ReplicationConfig
+   * @param state             - Required PipelineState
+   * @return Count of pipelines with the specified replication config and state
+   */
+  int getPipelineCount(ReplicationConfig replicationConfig,
+      PipelineState state) {
+    Preconditions
+        .checkNotNull(replicationConfig, "ReplicationConfig cannot be null");
+    Preconditions.checkNotNull(state, "Pipeline state cannot be null");
+
+    if (state == PipelineState.OPEN) {
+      return query2OpenPipelines.getOrDefault(
+              replicationConfig, Collections.EMPTY_LIST).size();
+    }
+
+    int count = 0;
+    for (Pipeline pipeline : pipelineMap.values()) {
+      if (pipeline.getReplicationConfig().equals(replicationConfig)
+          && pipeline.getPipelineState() == state) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  /**
    * Get list of pipeline corresponding to specified replication type,
    * replication factor and pipeline state.
    *
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index c68a14f..cb2abb8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -380,6 +380,17 @@ public class SCMPipelineManager implements
   }
 
   @Override
+  public int getPipelineCount(ReplicationConfig replicationConfig,
+      Pipeline.PipelineState state) {
+    lock.readLock().lock();
+    try {
+      return stateManager.getPipelineCount(replicationConfig, state);
+    } finally {
+      lock.readLock().unlock();
+    }
+  }
+
+  @Override
   public void addContainerToPipeline(PipelineID pipelineID,
       ContainerID containerID) throws IOException {
     lock.writeLock().lock();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
index c075dea..c3fedbe 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
@@ -91,6 +91,18 @@ public interface StateManager {
       Collection<PipelineID> excludePipelines
   );
 
+  /**
+   * Returns the count of pipelines meeting the given ReplicationConfig and
+   * state.
+   * @param replicationConfig The ReplicationConfig of the pipelines to count
+   * @param state The current state of the pipelines to count
+   * @return The count of pipelines meeting the above criteria
+   */
+  int getPipelineCount(
+      ReplicationConfig replicationConfig,
+      Pipeline.PipelineState state
+  );
+
   NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
       throws IOException;
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
index 356d047..2e74d96 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -33,12 +34,16 @@ public class WritableContainerFactory {
 
   private final WritableContainerProvider<ReplicationConfig> ratisProvider;
   private final WritableContainerProvider<ReplicationConfig> standaloneProvider;
+  private final WritableContainerProvider<ECReplicationConfig> ecProvider;
 
   public WritableContainerFactory(StorageContainerManager scm) {
     this.ratisProvider = new WritableRatisContainerProvider(
         scm.getConfiguration(), scm.getPipelineManager(),
         scm.getContainerManager(), scm.getPipelineChoosePolicy());
     this.standaloneProvider = ratisProvider;
+    this.ecProvider = new WritableECContainerProvider(scm.getConfiguration(),
+        scm.getPipelineManager(), scm.getContainerManager(),
+        scm.getPipelineChoosePolicy());
   }
 
   public ContainerInfo getContainer(final long size,
@@ -50,6 +55,9 @@ public class WritableContainerFactory {
           .getContainer(size, repConfig, owner, excludeList);
     case RATIS:
       return ratisProvider.getContainer(size, repConfig, owner, excludeList);
+    case EC:
+      return ecProvider.getContainer(size, (ECReplicationConfig)repConfig,
+          owner, excludeList);
     default:
       throw new IOException(repConfig.getReplicationType()
           + " is an invalid replication type");
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
new file mode 100644
index 0000000..2f3cf92
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+
+import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
+
+/**
+ * Writable Container provider to obtain a writable container for EC pipelines.
+ */
+public class WritableECContainerProvider
+    implements WritableContainerProvider<ECReplicationConfig> {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(WritableECContainerProvider.class);
+
+  private final ConfigurationSource conf;
+  private final PipelineManager pipelineManager;
+  private final PipelineChoosePolicy pipelineChoosePolicy;
+  private final ContainerManagerV2 containerManager;
+  private final long containerSize;
+  private final WritableECContainerProviderConfig providerConfig;
+
+  public WritableECContainerProvider(ConfigurationSource conf,
+      PipelineManager pipelineManager, ContainerManagerV2 containerManager,
+      PipelineChoosePolicy pipelineChoosePolicy) {
+    this.conf = conf;
+    this.providerConfig =
+        conf.getObject(WritableECContainerProviderConfig.class);
+    this.pipelineManager = pipelineManager;
+    this.containerManager = containerManager;
+    this.pipelineChoosePolicy = pipelineChoosePolicy;
+    this.containerSize = getConfiguredContainerSize();
+  }
+
+  /**
+   *
+   * @param size The max size of block in bytes which will be written. This
+   *             comes from Ozone Manager and will be the block size configured
+   *             for the cluster. The client cannot pass any arbitrary value
+   *             from this setting.
+   * @param repConfig The replication Config indicating the EC data and partiy
+   *                  block counts.
+   * @param owner The owner of the container
+   * @param excludeList A set of datanodes, container and pipelines which should
+   *                    not be considered.
+   * @return A containerInfo representing a block group with with space for the
+   *         write, or null if no container can be allocated.
+   * @throws IOException
+   */
+  @Override
+  public ContainerInfo getContainer(final long size,
+      ECReplicationConfig repConfig, String owner, ExcludeList excludeList)
+      throws IOException {
+    synchronized(this) {
+      int openPipelineCount = pipelineManager.getPipelineCount(repConfig,
+          Pipeline.PipelineState.OPEN);
+      if (openPipelineCount < providerConfig.getMinimumPipelines()) {
+        try {
+          // TODO - PipelineManager should allow for creating a pipeline with
+          //        excluded nodes. HDDS-5375.
+          return allocateContainer(repConfig, size, owner);
+        } catch (IOException e) {
+          LOG.warn("Unable to allocate a container for {} with {} existing "
+              + "containers", repConfig, openPipelineCount, e);
+        }
+      }
+    }
+    List<Pipeline> existingPipelines = pipelineManager.getPipelines(
+        repConfig, Pipeline.PipelineState.OPEN,
+        excludeList.getDatanodes(), excludeList.getPipelineIds());
+
+    PipelineRequestInformation pri =
+        PipelineRequestInformation.Builder.getBuilder().setSize(size).build();
+    while (existingPipelines.size() > 0) {
+      Pipeline pipeline =
+          pipelineChoosePolicy.choosePipeline(existingPipelines, pri);
+      if (pipeline == null) {
+        LOG.warn("Unable to select a pipeline from {} in the list",
+            existingPipelines.size());
+        break;
+      }
+      synchronized (pipeline.getId()) {
+        try {
+          ContainerInfo containerInfo = getContainerFromPipeline(pipeline);
+          // TODO - For EC, what is the block size? If the client says 128MB,
+          //        is that 128MB / 6 (for EC-6-3?)
+          if (containerInfo == null
+              || !containerHasSpace(containerInfo, size)) {
+            // This is O(n), which isn't great if there are a lot of pipelines
+            // and we keep finding pipelines without enough space.
+            existingPipelines.remove(pipeline);
+            pipelineManager.closePipeline(pipeline, true);
+          } else {
+            if (containerIsExcluded(containerInfo, excludeList)) {
+              existingPipelines.remove(pipeline);
+            } else {
+              return containerInfo;
+            }
+          }
+        } catch(PipelineNotFoundException | ContainerNotFoundException e){
+          LOG.warn("Pipeline or container not found when selecting a writable "
+              + "container", e);
+          existingPipelines.remove(pipeline);
+          pipelineManager.closePipeline(pipeline, true);
+        }
+      }
+    }
+    // If we get here, all the pipelines we tried were no good. So try to
+    // allocate a new one and use it.
+    try {
+      synchronized(this) {
+        // TODO - PipelineManager should allow for creating a pipeline with
+        //        excluded nodes. HDDS-5375.
+        return allocateContainer(repConfig, size, owner);
+      }
+    } catch (IOException e) {
+      LOG.error("Unable to allocate a container for {} after trying all "
+          + "existing containers", repConfig, e);
+      return null;
+    }
+  }
+
+  private ContainerInfo allocateContainer(ReplicationConfig repConfig,
+      long size, String owner) throws IOException {
+    Pipeline newPipeline = pipelineManager.createPipeline(repConfig);
+    ContainerInfo container =
+        containerManager.getMatchingContainer(size, owner, newPipeline);
+    pipelineManager.openPipeline(newPipeline.getId());
+    return container;
+  }
+
+  private boolean containerIsExcluded(ContainerInfo container,
+      ExcludeList excludeList) {
+    return excludeList.getContainerIds().contains(container.containerID());
+  }
+
+  private ContainerInfo getContainerFromPipeline(Pipeline pipeline)
+      throws IOException {
+    // Assume the container is still open if the below method returns it. On
+    // container FINALIZE, ContainerManager will remove the container from the
+    // pipeline list in PipelineManager. Finalize can be triggered by a DN
+    // sending a message that the container is full, or on close pipeline or
+    // on a stale / dead node event (via close pipeline).
+    NavigableSet<ContainerID> containers =
+        pipelineManager.getContainersInPipeline(pipeline.getId());
+    // Assume 1 container per pipeline for EC
+    if (containers.size() == 0) {
+      return null;
+    }
+    ContainerID containerID = containers.first();
+    return containerManager.getContainer(containerID);
+  }
+
+  private boolean containerHasSpace(ContainerInfo container, long size) {
+    // The size passed from OM will be the cluster block size. Therefore we
+    // just check if the container has enough free space to accommodate another
+    // full block.
+    return container.getUsedBytes() + size <= containerSize;
+  }
+
+  private long getConfiguredContainerSize() {
+    return (long) conf.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, BYTES);
+  }
+
+  /**
+   * Class to hold configuration for WriteableECContainerProvider.
+   */
+  @ConfigGroup(prefix = "ozone.scm.ec")
+  public static class WritableECContainerProviderConfig {
+
+    @Config(key = "pipeline.minimum",
+        defaultValue = "5",
+        type = ConfigType.INT,
+        description = "The minimum number of pipelines to have open for each " +
+            "Erasure Coding configuration",
+        tags = ConfigTag.STORAGE)
+    private int minimumPipelines = 5;
+
+    public int getMinimumPipelines() {
+      return minimumPipelines;
+    }
+
+    public void setMinimumPipelines(int minPipelines) {
+      this.minimumPipelines = minPipelines;
+    }
+
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
index 013d3ff..b2cb636 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
 import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
@@ -183,6 +184,11 @@ public class TestSCMContainerPlacementRackAware {
         nodeManager, conf, cluster, false, metrics);
   }
 
+  @After
+  public void teardown() {
+    metrics.unRegister();
+  }
+
   @Test
   public void chooseNodeWithNoExcludedNodes() throws SCMException {
     // test choose new datanodes for new pipeline cases
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 791572f..9838e8b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -34,7 +34,7 @@ import java.util.stream.Stream;
 /**
  * Mock PipelineManager implementation for testing.
  */
-public final class MockPipelineManager implements PipelineManager {
+public class MockPipelineManager implements PipelineManager {
 
   private PipelineStateManager stateManager;
 
@@ -42,7 +42,7 @@ public final class MockPipelineManager implements PipelineManager {
     return new MockPipelineManager();
   }
 
-  private MockPipelineManager() {
+  MockPipelineManager() {
     this.stateManager = new PipelineStateManager();
   }
 
@@ -117,6 +117,19 @@ public final class MockPipelineManager implements PipelineManager {
   }
 
   @Override
+  /**
+   * Returns the count of pipelines meeting the given ReplicationConfig and
+   * state.
+   * @param replicationConfig The ReplicationConfig of the pipelines to count
+   * @param state The current state of the pipelines to count
+   * @return The count of pipelines meeting the above criteria
+   */
+  public int getPipelineCount(ReplicationConfig replicationConfig,
+      final Pipeline.PipelineState state) {
+    return stateManager.getPipelineCount(replicationConfig, state);
+  }
+
+  @Override
   public void addContainerToPipeline(final PipelineID pipelineID,
                                      final ContainerID containerID)
       throws IOException {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
new file mode 100644
index 0000000..ff20f57
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.ALLOCATED;
+
+/**
+ * Test for the ECPipelineProvider.
+ */
+public class TestECPipelineProvider {
+
+  private PipelineProvider provider;
+  private OzoneConfiguration conf;
+  private NodeManager nodeManager = Mockito.mock(NodeManager.class);
+  private StateManager stateManager = Mockito.mock(StateManager.class);
+  private PlacementPolicy placementPolicy = Mockito.mock(PlacementPolicy.class);
+
+  @Before
+  public void setup() throws IOException {
+    conf = new OzoneConfiguration();
+    provider = new ECPipelineProvider(
+        nodeManager, stateManager, conf, placementPolicy);
+
+    // Placement policy will always return EC number of random nodes.
+    Mockito.when(placementPolicy.chooseDatanodes(Mockito.anyList(),
+        Mockito.anyList(), Mockito.anyInt(), Mockito.anyLong()))
+        .thenAnswer(invocation -> {
+          List<DatanodeDetails> dns = new ArrayList<>();
+          for (int i=0; i<(int)invocation.getArguments()[2]; i++) {
+            dns.add(MockDatanodeDetails.randomDatanodeDetails());
+          }
+          return dns;
+        });
+  }
+
+
+  @Test
+  public void testSimplePipelineCanBeCreatedWithIndexes() throws IOException {
+    ECReplicationConfig ecConf = new ECReplicationConfig(3, 2);
+    Pipeline pipeline = provider.create(ecConf);
+    Assert.assertEquals(EC, pipeline.getType());
+    Assert.assertEquals(ecConf.getData() + ecConf.getParity(),
+        pipeline.getNodes().size());
+    Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+    List<DatanodeDetails> dns = pipeline.getNodes();
+    for (int i=0; i<ecConf.getRequiredNodes(); i++) {
+      // EC DN indexes are numbered starting from 1 to N.
+      Assert.assertEquals(i+1, pipeline.getReplicaIndex(dns.get(i)));
+    }
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index a0bb44a..6f1dc5d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -209,11 +209,17 @@ public class TestPipelineManagerImpl {
     Assert.assertFalse(pipelineManager
         .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
             Pipeline.PipelineState.OPEN).contains(pipeline));
+    Assert.assertEquals(1, pipelineManager
+        .getPipelineCount(new RatisReplicationConfig(ReplicationFactor.THREE),
+            Pipeline.PipelineState.DORMANT));
 
     pipelineManager.activatePipeline(pipeline.getId());
     Assert.assertTrue(pipelineManager
         .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
             Pipeline.PipelineState.OPEN).contains(pipeline));
+    Assert.assertEquals(1, pipelineManager
+        .getPipelineCount(new RatisReplicationConfig(ReplicationFactor.THREE),
+            Pipeline.PipelineState.OPEN));
     buffer.flush();
     Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen());
     pipelineManager.close();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateMap.java
new file mode 100644
index 0000000..11ae86c
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateMap.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for PipelineStateMap.
+ */
+
+public class TestPipelineStateMap {
+
+  private PipelineStateMap map;
+
+  @Before
+  public void setup() {
+    map = new PipelineStateMap();
+  }
+
+  @After
+  public void teardown() throws IOException {
+  }
+
+  @Test
+  public void testCountPipelines() throws IOException {
+    Pipeline p;
+
+    // Open Stanadlone Pipelines
+    map.addPipeline(MockPipeline.createPipeline(1));
+    map.addPipeline(MockPipeline.createPipeline(1));
+    p = MockPipeline.createPipeline(1);
+    map.addPipeline(p);
+    map.updatePipelineState(p.getId(), Pipeline.PipelineState.CLOSED);
+
+    // Ratis pipeline
+    map.addPipeline(MockPipeline.createRatisPipeline());
+    p = MockPipeline.createRatisPipeline();
+    map.addPipeline(p);
+    map.updatePipelineState(p.getId(), Pipeline.PipelineState.CLOSED);
+
+    // EC Pipelines
+    map.addPipeline(MockPipeline.createEcPipeline(
+        new ECReplicationConfig(3, 2)));
+    map.addPipeline(MockPipeline.createEcPipeline(
+        new ECReplicationConfig(3, 2)));
+    p = MockPipeline.createEcPipeline(new ECReplicationConfig(3, 2));
+    map.addPipeline(p);
+    map.updatePipelineState(p.getId(), Pipeline.PipelineState.CLOSED);
+
+    assertEquals(2, map.getPipelineCount(new StandaloneReplicationConfig(ONE),
+        Pipeline.PipelineState.OPEN));
+    assertEquals(1, map.getPipelineCount(new RatisReplicationConfig(THREE),
+        Pipeline.PipelineState.OPEN));
+    assertEquals(2, map.getPipelineCount(new ECReplicationConfig(3, 2),
+        Pipeline.PipelineState.OPEN));
+
+    assertEquals(0, map.getPipelineCount(new ECReplicationConfig(6, 3),
+        Pipeline.PipelineState.OPEN));
+
+    assertEquals(1, map.getPipelineCount(new StandaloneReplicationConfig(ONE),
+        Pipeline.PipelineState.CLOSED));
+    assertEquals(1, map.getPipelineCount(new RatisReplicationConfig(THREE),
+        Pipeline.PipelineState.CLOSED));
+    assertEquals(1, map.getPipelineCount(new ECReplicationConfig(3, 2),
+        Pipeline.PipelineState.CLOSED));
+  }
+
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
new file mode 100644
index 0000000..ceaec5f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests to validate the WritableECContainerProvider works correctly.
+ */
+public class TestWritableECContainerProvider {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestWritableECContainerProvider.class);
+  private static final String OWNER = "SCM";
+  private PipelineManager pipelineManager;
+  private ContainerManagerV2 containerManager
+      = Mockito.mock(ContainerManagerV2.class);
+  private PipelineChoosePolicy pipelineChoosingPolicy
+      = new HealthyPipelineChoosePolicy();
+
+  private ConfigurationSource conf;
+  private WritableContainerProvider provider;
+  private ReplicationConfig repConfig;
+  private int minPipelines;
+
+  private Map<ContainerID, ContainerInfo> containers;
+
+  @Before
+  public void setup() throws ContainerNotFoundException {
+    repConfig = new ECReplicationConfig(3, 2);
+    conf = new OzoneConfiguration();
+    WritableECContainerProvider.WritableECContainerProviderConfig providerConf =
+        conf.getObject(WritableECContainerProvider
+            .WritableECContainerProviderConfig.class);
+    minPipelines = providerConf.getMinimumPipelines();
+    containers = new HashMap<>();
+    pipelineManager = MockPipelineManager.getInstance();
+    provider = new WritableECContainerProvider(
+        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+    Mockito.doAnswer(call -> {
+      Pipeline pipeline = (Pipeline)call.getArguments()[2];
+      ContainerInfo container = createContainer(pipeline,
+          repConfig, System.nanoTime());
+      pipelineManager.addContainerToPipeline(
+          pipeline.getId(), container.containerID());
+      containers.put(container.containerID(), container);
+      return container;
+    }).when(containerManager).getMatchingContainer(Matchers.anyLong(),
+        Matchers.anyString(), Matchers.any(Pipeline.class));
+
+    Mockito.doAnswer(call ->
+        containers.get((ContainerID)call.getArguments()[0]))
+        .when(containerManager).getContainer(Matchers.any(ContainerID.class));
+
+  }
+
+  @Test
+  public void testPipelinesCreatedUpToMinLimitAndRandomPipelineReturned()
+      throws IOException {
+    // The first 5 calls should return a different container
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container =
+          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+      assertFalse(allocatedContainers.contains(container));
+      allocatedContainers.add(container);
+    }
+
+    allocatedContainers.clear();
+    for (int i=0; i<20; i++) {
+      ContainerInfo container =
+          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+      allocatedContainers.add(container);
+    }
+    // Should have minPipelines containers created
+    assertEquals(minPipelines,
+        pipelineManager.getPipelines(repConfig, OPEN).size());
+    // We should have more than 1 allocatedContainers in the set proving a
+    // random container is selected each time. Do not check for 5 here as there
+    // is a reasonable chance that in 20 turns we don't pick all 5 nodes.
+    assertTrue(allocatedContainers.size() > 2);
+  }
+
+  @Test
+  public void testPiplineLimitIgnoresExcludedPipelines() throws IOException {
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container = provider.getContainer(
+          1, repConfig, OWNER, new ExcludeList());
+      allocatedContainers.add(container);
+    }
+    // We have the min limit of pipelines, but then exclude one. It should use
+    // one of the existing rather than createing a new one, as the limit is
+    // checked against all pipelines, not just the filtered list
+    ExcludeList exclude = new ExcludeList();
+    PipelineID excludedID = allocatedContainers
+        .stream().findFirst().get().getPipelineID();
+    exclude.addPipeline(excludedID);
+
+    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
+    assertNotEquals(excludedID, c.getPipelineID());
+    assertTrue(allocatedContainers.contains(c));
+  }
+
+  @Test
+  public void testNewPipelineCreatedIfAllPipelinesExcluded()
+      throws IOException {
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container = provider.getContainer(
+          1, repConfig, OWNER, new ExcludeList());
+      allocatedContainers.add(container);
+    }
+    // We have the min limit of pipelines, but then exclude one. It should use
+    // one of the existing rather than createing a new one, as the limit is
+    // checked against all pipelines, not just the filtered list
+    ExcludeList exclude = new ExcludeList();
+    for (ContainerInfo c : allocatedContainers) {
+      exclude.addPipeline(c.getPipelineID());
+    }
+    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
+    assertFalse(allocatedContainers.contains(c));
+  }
+
+  @Test
+  public void testNewPipelineCreatedIfAllContainersExcluded()
+      throws IOException {
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container = provider.getContainer(
+          1, repConfig, OWNER, new ExcludeList());
+      allocatedContainers.add(container);
+    }
+    // We have the min limit of pipelines, but then exclude one. It should use
+    // one of the existing rather than createing a new one, as the limit is
+    // checked against all pipelines, not just the filtered list
+    ExcludeList exclude = new ExcludeList();
+    for (ContainerInfo c : allocatedContainers) {
+      exclude.addConatinerId(c.containerID());
+    }
+    ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
+    assertFalse(allocatedContainers.contains(c));
+  }
+
+  @Test
+  public void testUnableToCreateAnyPipelinesReturnsNull() throws IOException {
+    pipelineManager = new MockPipelineManager() {
+      @Override
+      public Pipeline createPipeline(ReplicationConfig repConf)
+          throws IOException {
+        throw new IOException("Cannot create pipelines");
+      }
+    };
+    provider = new WritableECContainerProvider(
+        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+    ContainerInfo container =
+        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+    assertNull(container);
+  }
+
+  @Test
+  public void testExistingPipelineReturnedWhenNewCannotBeCreated()
+      throws IOException {
+    pipelineManager = new MockPipelineManager() {
+
+      private boolean throwError = false;
+
+      @Override
+      public Pipeline createPipeline(ReplicationConfig repConf)
+          throws IOException {
+        if (throwError) {
+          throw new IOException("Cannot create pipelines");
+        }
+        throwError = true;
+        return super.createPipeline(repConfig);
+      }
+    };
+    provider = new WritableECContainerProvider(
+        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+    ContainerInfo container =
+        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+    for (int i=0; i<5; i++) {
+      ContainerInfo nextContainer =
+          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+      assertEquals(container, nextContainer);
+    }
+  }
+
+  @Test
+  public void testNewContainerAllocatedAndPipelinesClosedIfNoSpaceInExisting()
+      throws IOException {
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container =
+          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+      assertFalse(allocatedContainers.contains(container));
+      allocatedContainers.add(container);
+    }
+    // Update all the containers to make them full
+    for (ContainerInfo c : allocatedContainers) {
+      c.setUsedBytes(getMaxContainerSize());
+    }
+    // Get a new container and ensure it is not one of the original set
+    ContainerInfo newContainer =
+        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+    assertNotNull(newContainer);
+    assertFalse(allocatedContainers.contains(newContainer));
+    // The original pipelines should all be closed
+    for (ContainerInfo c : allocatedContainers) {
+      Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
+      assertEquals(CLOSED, pipeline.getPipelineState());
+    }
+  }
+
+  @Test
+  public void testPipelineNotFoundWhenAttemptingToUseExisting()
+      throws IOException {
+    // Ensure PM throws PNF exception when we ask for the containers in the
+    // pipeline
+    pipelineManager = new MockPipelineManager() {
+
+      @Override
+      public NavigableSet<ContainerID> getContainersInPipeline(
+          PipelineID pipelineID) throws IOException {
+        throw new PipelineNotFoundException("Simulated exception");
+      }
+    };
+    provider = new WritableECContainerProvider(
+        conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container =
+          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+      assertFalse(allocatedContainers.contains(container));
+      allocatedContainers.add(container);
+    }
+    // Now attempt to get a container - any attempt to use an existing with
+    // throw PNF and then we must allocate a new one
+    ContainerInfo newContainer =
+        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+    assertNotNull(newContainer);
+    assertFalse(allocatedContainers.contains(newContainer));
+  }
+
+  @Test
+  public void testContainerNotFoundWhenAttemptingToUseExisting()
+      throws IOException {
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container =
+          provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+      assertFalse(allocatedContainers.contains(container));
+      allocatedContainers.add(container);
+    }
+
+    // Ensure ContainerManager always throws when a container is requested so
+    // existing pipelines cannot be used
+    Mockito.doAnswer(call -> {
+      throw new ContainerNotFoundException();
+    }).when(containerManager).getContainer(Matchers.any(ContainerID.class));
+
+    ContainerInfo newContainer =
+        provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+    assertNotNull(newContainer);
+    assertFalse(allocatedContainers.contains(newContainer));
+
+    // Ensure all the existing pipelines are closed
+    for (ContainerInfo c : allocatedContainers) {
+      Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
+      assertEquals(CLOSED, pipeline.getPipelineState());
+    }
+  }
+
+  @Test
+  public void testPipelineOpenButContainerRemovedFromIt() throws IOException {
+    // This can happen if the container close process is triggered from the DN.
+    // When tha happens, CM will change the container state to CLOSING and
+    // remove it from the container list in pipeline Manager.
+    Set<ContainerInfo> allocatedContainers = new HashSet<>();
+    for (int i=0; i<minPipelines; i++) {
+      ContainerInfo container = provider.getContainer(
+          1, repConfig, OWNER, new ExcludeList());
+      assertFalse(allocatedContainers.contains(container));
+      allocatedContainers.add(container);
+      // Remove the container from the pipeline to simulate closing it
+      pipelineManager.removeContainerFromPipeline(
+          container.getPipelineID(), container.containerID());
+    }
+    ContainerInfo newContainer = provider.getContainer(
+        1, repConfig, OWNER, new ExcludeList());
+    assertFalse(allocatedContainers.contains(newContainer));
+    for (ContainerInfo c : allocatedContainers) {
+      Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
+      assertEquals(CLOSED, pipeline.getPipelineState());
+    }
+  }
+
+  private ContainerInfo createContainer(Pipeline pipeline,
+      ReplicationConfig repConf, long containerID) {
+    return new ContainerInfo.Builder()
+        .setContainerID(containerID)
+        .setOwner(OWNER)
+        .setReplicationConfig(repConf)
+        .setState(HddsProtos.LifeCycleState.OPEN)
+        .setPipelineID(pipeline.getId())
+        .setNumberOfKeys(0)
+        .setUsedBytes(0)
+        .setSequenceId(0)
+        .setDeleteTransactionId(0)
+        .build();
+  }
+
+  private long getMaxContainerSize() {
+    return (long)conf.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, BYTES);
+  }
+
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org