You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2021/06/25 19:01:33 UTC
[ozone] branch HDDS-3816-ec updated: HDDS-4892. EC: Implement basic
EC pipeline provider (#2353)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-3816-ec by this push:
new 206c935 HDDS-4892. EC: Implement basic EC pipeline provider (#2353)
206c935 is described below
commit 206c9358a5687e3c33f1d31e1c00f3e2d6130d1d
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Fri Jun 25 20:01:17 2021 +0100
HDDS-4892. EC: Implement basic EC pipeline provider (#2353)
---
.../apache/hadoop/hdds/scm/pipeline/Pipeline.java | 6 +
.../hadoop/hdds/scm/pipeline/MockPipeline.java | 16 +-
.../hadoop/hdds/scm/pipeline/TestPipeline.java | 6 +
.../algorithms/SCMContainerPlacementMetrics.java | 10 +
.../hdds/scm/pipeline/ECPipelineProvider.java | 88 +++++
.../hadoop/hdds/scm/pipeline/PipelineFactory.java | 16 +
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 11 +
.../hdds/scm/pipeline/PipelineManagerV2Impl.java | 13 +
.../hdds/scm/pipeline/PipelineStateManager.java | 13 +
.../scm/pipeline/PipelineStateManagerV2Impl.java | 20 ++
.../hadoop/hdds/scm/pipeline/PipelineStateMap.java | 30 ++
.../hdds/scm/pipeline/SCMPipelineManager.java | 11 +
.../hadoop/hdds/scm/pipeline/StateManager.java | 12 +
.../scm/pipeline/WritableContainerFactory.java | 8 +
.../scm/pipeline/WritableECContainerProvider.java | 229 +++++++++++++
.../TestSCMContainerPlacementRackAware.java | 6 +
.../hdds/scm/pipeline/MockPipelineManager.java | 17 +-
.../hdds/scm/pipeline/TestECPipelineProvider.java | 84 +++++
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 6 +
.../hdds/scm/pipeline/TestPipelineStateMap.java | 95 ++++++
.../pipeline/TestWritableECContainerProvider.java | 372 +++++++++++++++++++++
21 files changed, 1059 insertions(+), 10 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
index 33bb70a..638180c 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/pipeline/Pipeline.java
@@ -263,6 +263,12 @@ public final class Pipeline {
}
public boolean isHealthy() {
+ // EC pipelines are not reported by the DN and do not have a leader. If a
+ // node goes stale or dead, EC pipelines will by closed like RATIS pipelines
+ // but at the current time there are not other health metrics for EC.
+ if (replicationConfig.getReplicationType() == ReplicationType.EC) {
+ return true;
+ }
for (Long reportedTime : nodeStatus.values()) {
if (reportedTime < 0) {
return false;
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
index 259b57c..b2225d2 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipeline.java
@@ -91,14 +91,15 @@ public final class MockPipeline {
}
public static Pipeline createEcPipeline() {
+ return createEcPipeline(new ECReplicationConfig(3, 2));
+ }
- List<DatanodeDetails> nodes = new ArrayList<>();
- nodes.add(MockDatanodeDetails.randomDatanodeDetails());
- nodes.add(MockDatanodeDetails.randomDatanodeDetails());
- nodes.add(MockDatanodeDetails.randomDatanodeDetails());
- nodes.add(MockDatanodeDetails.randomDatanodeDetails());
- nodes.add(MockDatanodeDetails.randomDatanodeDetails());
+ public static Pipeline createEcPipeline(ECReplicationConfig repConfig) {
+ List<DatanodeDetails> nodes = new ArrayList<>();
+ for (int i=0; i<repConfig.getRequiredNodes(); i++) {
+ nodes.add(MockDatanodeDetails.randomDatanodeDetails());
+ }
Map<DatanodeDetails, Integer> nodeIndexes = new HashMap<>();
int index = nodes.size() - 1;
@@ -110,8 +111,7 @@ public final class MockPipeline {
return Pipeline.newBuilder()
.setState(Pipeline.PipelineState.OPEN)
.setId(PipelineID.randomId())
- .setReplicationConfig(
- new ECReplicationConfig(3, 2))
+ .setReplicationConfig(repConfig)
.setNodes(nodes)
.setReplicaIndexes(nodeIndexes)
.build();
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java
index 0b9e9fe..cf7dd4d 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipeline.java
@@ -81,4 +81,10 @@ public class TestPipeline {
reloadedPipeline.getReplicaIndex(dn));
}
}
+
+ @Test
+ public void testECPipelineIsAlwaysHealthy() throws IOException {
+ Pipeline pipeline = MockPipeline.createEcPipeline();
+ Assert.assertTrue(pipeline.isHealthy());
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementMetrics.java
index 1ca68bd..b4a1436 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/SCMContainerPlacementMetrics.java
@@ -53,8 +53,18 @@ public class SCMContainerPlacementMetrics implements MetricsSource {
public SCMContainerPlacementMetrics() {
}
+ /**
+ * Return the existing instance of SCMContainerPlacementMetrics if it is
+ * registered in the metrics System, otherwise create a new instance, register
+ * it and return it.
+ * @return A new or existing SCMContainerPlacementMetrics object
+ */
public static SCMContainerPlacementMetrics create() {
MetricsSystem ms = DefaultMetricsSystem.instance();
+ MetricsSource existingSource = ms.getSource(SOURCE_NAME);
+ if (existingSource != null) {
+ return (SCMContainerPlacementMetrics)existingSource;
+ }
registry = new MetricsRegistry(RECORD_INFO);
return ms.register(SOURCE_NAME, "SCM Container Placement Metrics",
new SCMContainerPlacementMetrics());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
new file mode 100644
index 0000000..5ce442f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/ECPipelineProvider.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class to create pipelines for EC containers.
+ */
+public class ECPipelineProvider extends PipelineProvider<ECReplicationConfig> {
+
+ // TODO - EC Placement Policy. Standard Network Aware topology will not work
+ // for EC as it stands. We may want an "as many racks as possible"
+ // policy. HDDS-5326.
+
+ private final ConfigurationSource conf;
+ private final PlacementPolicy placementPolicy;
+
+ public ECPipelineProvider(NodeManager nodeManager,
+ StateManager stateManager,
+ ConfigurationSource conf,
+ PlacementPolicy placementPolicy) {
+ super(nodeManager, stateManager);
+ this.conf = conf;
+ this.placementPolicy = placementPolicy;
+ }
+
+ @Override
+ protected Pipeline create(ECReplicationConfig replicationConfig)
+ throws IOException {
+ List<DatanodeDetails> dns = placementPolicy.chooseDatanodes(null,
+ null, replicationConfig.getRequiredNodes(), 0);
+ return create(replicationConfig, dns);
+ }
+
+ @Override
+ protected Pipeline create(ECReplicationConfig replicationConfig,
+ List<DatanodeDetails> nodes) {
+
+ Map<DatanodeDetails, Integer> dnIndexes = new HashMap<>();
+ int ecIndex = 1;
+ for (DatanodeDetails dn : nodes) {
+ dnIndexes.put(dn, ecIndex);
+ ecIndex++;
+ }
+
+ return Pipeline.newBuilder()
+ .setId(PipelineID.randomId())
+ .setState(Pipeline.PipelineState.ALLOCATED)
+ .setReplicationConfig(replicationConfig)
+ .setNodes(nodes)
+ .setReplicaIndexes(dnIndexes)
+ .build();
+ }
+
+ @Override
+ protected void close(Pipeline pipeline) throws IOException {
+ }
+
+ @Override
+ protected void shutdown() {
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 0fbabee..b70b1a4 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -23,6 +23,10 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
+import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -50,6 +54,18 @@ public class PipelineFactory {
new RatisPipelineProvider(nodeManager,
stateManager, conf,
eventPublisher, scmContext));
+ PlacementPolicy placementPolicy;
+ try {
+ placementPolicy = ContainerPlacementPolicyFactory.getPolicy(conf,
+ nodeManager, nodeManager.getClusterNetworkTopologyMap(), true,
+ SCMContainerPlacementMetrics.create());
+ } catch (SCMException e) {
+ throw new RuntimeException("Unable to get the container placement policy",
+ e);
+ }
+ providers.put(ReplicationType.EC,
+ new ECPipelineProvider(nodeManager, stateManager, conf,
+ placementPolicy));
}
protected PipelineFactory() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index ad8faca..6a35249 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -65,6 +65,17 @@ public interface PipelineManager extends Closeable, PipelineManagerMXBean {
Collection<PipelineID> excludePipelines
);
+ /**
+ * Returns the count of pipelines meeting the given ReplicationConfig and
+ * state.
+ * @param replicationConfig The ReplicationConfig of the pipelines to count
+ * @param state The current state of the pipelines to count
+ * @return The count of pipelines meeting the above criteria
+ */
+ int getPipelineCount(
+ ReplicationConfig replicationConfig, Pipeline.PipelineState state
+ );
+
void addContainerToPipeline(PipelineID pipelineID, ContainerID containerID)
throws IOException;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index b7d8864..c383093 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -234,6 +234,19 @@ public class PipelineManagerV2Impl implements PipelineManager {
}
@Override
+ /**
+ * Returns the count of pipelines meeting the given ReplicationConfig and
+ * state.
+ * @param replicationConfig The ReplicationConfig of the pipelines to count
+ * @param state The current state of the pipelines to count
+ * @return The count of pipelines meeting the above criteria
+ */
+ public int getPipelineCount(ReplicationConfig config,
+ Pipeline.PipelineState state) {
+ return stateManager.getPipelineCount(config, state);
+ }
+
+ @Override
public void addContainerToPipeline(
PipelineID pipelineID, ContainerID containerID) throws IOException {
// should not lock here, since no ratis operation happens.
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
index c3df3d2..83e4ae5 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManager.java
@@ -98,6 +98,19 @@ public class PipelineStateManager implements StateManager {
}
@Override
+ /**
+ * Returns the count of pipelines meeting the given ReplicationConfig and
+ * state.
+ * @param replicationConfig The ReplicationConfig of the pipelines to count
+ * @param state The current state of the pipelines to count
+ * @return The count of pipelines meeting the above criteria
+ */
+ public int getPipelineCount(ReplicationConfig replicationConfig,
+ PipelineState state) {
+ return pipelineStateMap.getPipelineCount(replicationConfig, state);
+ }
+
+ @Override
public NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
throws IOException {
return pipelineStateMap.getContainers(pipelineID);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
index 7f52d93..2c1ef65 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateManagerV2Impl.java
@@ -176,6 +176,26 @@ public class PipelineStateManagerV2Impl implements StateManager {
}
}
+
+ /**
+ * Returns the count of pipelines meeting the given ReplicationConfig and
+ * state.
+ * @param replicationConfig The ReplicationConfig of the pipelines to count
+ * @param state The current state of the pipelines to count
+ * @return The count of pipelines meeting the above criteria
+ */
+ @Override
+ public int getPipelineCount(
+ ReplicationConfig replicationConfig,
+ Pipeline.PipelineState state) {
+ lock.readLock().lock();
+ try {
+ return pipelineStateMap.getPipelineCount(replicationConfig, state);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
@Override
public NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
throws IOException {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index ac95b07..6d42777 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -184,6 +184,36 @@ class PipelineStateMap {
}
/**
+ * Get a count of pipelines with the given replicationConfig and state.
+ * This method is most efficient when getting a count for OPEN pipeline
+ * as the result can be obtained directly from the cached open list.
+ *
+ * @param replicationConfig - ReplicationConfig
+ * @param state - Required PipelineState
+ * @return Count of pipelines with the specified replication config and state
+ */
+ int getPipelineCount(ReplicationConfig replicationConfig,
+ PipelineState state) {
+ Preconditions
+ .checkNotNull(replicationConfig, "ReplicationConfig cannot be null");
+ Preconditions.checkNotNull(state, "Pipeline state cannot be null");
+
+ if (state == PipelineState.OPEN) {
+ return query2OpenPipelines.getOrDefault(
+ replicationConfig, Collections.EMPTY_LIST).size();
+ }
+
+ int count = 0;
+ for (Pipeline pipeline : pipelineMap.values()) {
+ if (pipeline.getReplicationConfig().equals(replicationConfig)
+ && pipeline.getPipelineState() == state) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ /**
* Get list of pipeline corresponding to specified replication type,
* replication factor and pipeline state.
*
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index c68a14f..cb2abb8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -380,6 +380,17 @@ public class SCMPipelineManager implements
}
@Override
+ public int getPipelineCount(ReplicationConfig replicationConfig,
+ Pipeline.PipelineState state) {
+ lock.readLock().lock();
+ try {
+ return stateManager.getPipelineCount(replicationConfig, state);
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ @Override
public void addContainerToPipeline(PipelineID pipelineID,
ContainerID containerID) throws IOException {
lock.writeLock().lock();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
index c075dea..c3fedbe 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/StateManager.java
@@ -91,6 +91,18 @@ public interface StateManager {
Collection<PipelineID> excludePipelines
);
+ /**
+ * Returns the count of pipelines meeting the given ReplicationConfig and
+ * state.
+ * @param replicationConfig The ReplicationConfig of the pipelines to count
+ * @param state The current state of the pipelines to count
+ * @return The count of pipelines meeting the above criteria
+ */
+ int getPipelineCount(
+ ReplicationConfig replicationConfig,
+ Pipeline.PipelineState state
+ );
+
NavigableSet<ContainerID> getContainers(PipelineID pipelineID)
throws IOException;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
index 356d047..2e74d96 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableContainerFactory.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
@@ -33,12 +34,16 @@ public class WritableContainerFactory {
private final WritableContainerProvider<ReplicationConfig> ratisProvider;
private final WritableContainerProvider<ReplicationConfig> standaloneProvider;
+ private final WritableContainerProvider<ECReplicationConfig> ecProvider;
public WritableContainerFactory(StorageContainerManager scm) {
this.ratisProvider = new WritableRatisContainerProvider(
scm.getConfiguration(), scm.getPipelineManager(),
scm.getContainerManager(), scm.getPipelineChoosePolicy());
this.standaloneProvider = ratisProvider;
+ this.ecProvider = new WritableECContainerProvider(scm.getConfiguration(),
+ scm.getPipelineManager(), scm.getContainerManager(),
+ scm.getPipelineChoosePolicy());
}
public ContainerInfo getContainer(final long size,
@@ -50,6 +55,9 @@ public class WritableContainerFactory {
.getContainer(size, repConfig, owner, excludeList);
case RATIS:
return ratisProvider.getContainer(size, repConfig, owner, excludeList);
+ case EC:
+ return ecProvider.getContainer(size, (ECReplicationConfig)repConfig,
+ owner, excludeList);
default:
throw new IOException(repConfig.getReplicationType()
+ " is an invalid replication type");
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
new file mode 100644
index 0000000..2f3cf92
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/WritableECContainerProvider.java
@@ -0,0 +1,229 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.PipelineRequestInformation;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.NavigableSet;
+
+import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
+
+/**
+ * Writable Container provider to obtain a writable container for EC pipelines.
+ */
+public class WritableECContainerProvider
+ implements WritableContainerProvider<ECReplicationConfig> {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(WritableECContainerProvider.class);
+
+ private final ConfigurationSource conf;
+ private final PipelineManager pipelineManager;
+ private final PipelineChoosePolicy pipelineChoosePolicy;
+ private final ContainerManagerV2 containerManager;
+ private final long containerSize;
+ private final WritableECContainerProviderConfig providerConfig;
+
+ public WritableECContainerProvider(ConfigurationSource conf,
+ PipelineManager pipelineManager, ContainerManagerV2 containerManager,
+ PipelineChoosePolicy pipelineChoosePolicy) {
+ this.conf = conf;
+ this.providerConfig =
+ conf.getObject(WritableECContainerProviderConfig.class);
+ this.pipelineManager = pipelineManager;
+ this.containerManager = containerManager;
+ this.pipelineChoosePolicy = pipelineChoosePolicy;
+ this.containerSize = getConfiguredContainerSize();
+ }
+
+ /**
+ *
+ * @param size The max size of block in bytes which will be written. This
+ * comes from Ozone Manager and will be the block size configured
+ * for the cluster. The client cannot pass any arbitrary value
+ * from this setting.
+ * @param repConfig The replication Config indicating the EC data and partiy
+ * block counts.
+ * @param owner The owner of the container
+ * @param excludeList A set of datanodes, container and pipelines which should
+ * not be considered.
+ * @return A containerInfo representing a block group with with space for the
+ * write, or null if no container can be allocated.
+ * @throws IOException
+ */
+ @Override
+ public ContainerInfo getContainer(final long size,
+ ECReplicationConfig repConfig, String owner, ExcludeList excludeList)
+ throws IOException {
+ synchronized(this) {
+ int openPipelineCount = pipelineManager.getPipelineCount(repConfig,
+ Pipeline.PipelineState.OPEN);
+ if (openPipelineCount < providerConfig.getMinimumPipelines()) {
+ try {
+ // TODO - PipelineManager should allow for creating a pipeline with
+ // excluded nodes. HDDS-5375.
+ return allocateContainer(repConfig, size, owner);
+ } catch (IOException e) {
+ LOG.warn("Unable to allocate a container for {} with {} existing "
+ + "containers", repConfig, openPipelineCount, e);
+ }
+ }
+ }
+ List<Pipeline> existingPipelines = pipelineManager.getPipelines(
+ repConfig, Pipeline.PipelineState.OPEN,
+ excludeList.getDatanodes(), excludeList.getPipelineIds());
+
+ PipelineRequestInformation pri =
+ PipelineRequestInformation.Builder.getBuilder().setSize(size).build();
+ while (existingPipelines.size() > 0) {
+ Pipeline pipeline =
+ pipelineChoosePolicy.choosePipeline(existingPipelines, pri);
+ if (pipeline == null) {
+ LOG.warn("Unable to select a pipeline from {} in the list",
+ existingPipelines.size());
+ break;
+ }
+ synchronized (pipeline.getId()) {
+ try {
+ ContainerInfo containerInfo = getContainerFromPipeline(pipeline);
+ // TODO - For EC, what is the block size? If the client says 128MB,
+ // is that 128MB / 6 (for EC-6-3?)
+ if (containerInfo == null
+ || !containerHasSpace(containerInfo, size)) {
+ // This is O(n), which isn't great if there are a lot of pipelines
+ // and we keep finding pipelines without enough space.
+ existingPipelines.remove(pipeline);
+ pipelineManager.closePipeline(pipeline, true);
+ } else {
+ if (containerIsExcluded(containerInfo, excludeList)) {
+ existingPipelines.remove(pipeline);
+ } else {
+ return containerInfo;
+ }
+ }
+ } catch(PipelineNotFoundException | ContainerNotFoundException e){
+ LOG.warn("Pipeline or container not found when selecting a writable "
+ + "container", e);
+ existingPipelines.remove(pipeline);
+ pipelineManager.closePipeline(pipeline, true);
+ }
+ }
+ }
+ // If we get here, all the pipelines we tried were no good. So try to
+ // allocate a new one and use it.
+ try {
+ synchronized(this) {
+ // TODO - PipelineManager should allow for creating a pipeline with
+ // excluded nodes. HDDS-5375.
+ return allocateContainer(repConfig, size, owner);
+ }
+ } catch (IOException e) {
+ LOG.error("Unable to allocate a container for {} after trying all "
+ + "existing containers", repConfig, e);
+ return null;
+ }
+ }
+
+ private ContainerInfo allocateContainer(ReplicationConfig repConfig,
+ long size, String owner) throws IOException {
+ Pipeline newPipeline = pipelineManager.createPipeline(repConfig);
+ ContainerInfo container =
+ containerManager.getMatchingContainer(size, owner, newPipeline);
+ pipelineManager.openPipeline(newPipeline.getId());
+ return container;
+ }
+
+ private boolean containerIsExcluded(ContainerInfo container,
+ ExcludeList excludeList) {
+ return excludeList.getContainerIds().contains(container.containerID());
+ }
+
+ private ContainerInfo getContainerFromPipeline(Pipeline pipeline)
+ throws IOException {
+ // Assume the container is still open if the below method returns it. On
+ // container FINALIZE, ContainerManager will remove the container from the
+ // pipeline list in PipelineManager. Finalize can be triggered by a DN
+ // sending a message that the container is full, or on close pipeline or
+ // on a stale / dead node event (via close pipeline).
+ NavigableSet<ContainerID> containers =
+ pipelineManager.getContainersInPipeline(pipeline.getId());
+ // Assume 1 container per pipeline for EC
+ if (containers.size() == 0) {
+ return null;
+ }
+ ContainerID containerID = containers.first();
+ return containerManager.getContainer(containerID);
+ }
+
+ private boolean containerHasSpace(ContainerInfo container, long size) {
+ // The size passed from OM will be the cluster block size. Therefore we
+ // just check if the container has enough free space to accommodate another
+ // full block.
+ return container.getUsedBytes() + size <= containerSize;
+ }
+
+ private long getConfiguredContainerSize() {
+ return (long) conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, BYTES);
+ }
+
+ /**
+ * Class to hold configuration for WriteableECContainerProvider.
+ */
+ @ConfigGroup(prefix = "ozone.scm.ec")
+ public static class WritableECContainerProviderConfig {
+
+ @Config(key = "pipeline.minimum",
+ defaultValue = "5",
+ type = ConfigType.INT,
+ description = "The minimum number of pipelines to have open for each " +
+ "Erasure Coding configuration",
+ tags = ConfigTag.STORAGE)
+ private int minimumPipelines = 5;
+
+ public int getMinimumPipelines() {
+ return minimumPipelines;
+ }
+
+ public void setMinimumPipelines(int minPipelines) {
+ this.minimumPipelines = minPipelines;
+ }
+
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
index 013d3ff..b2cb636 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementRackAware.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -183,6 +184,11 @@ public class TestSCMContainerPlacementRackAware {
nodeManager, conf, cluster, false, metrics);
}
+ @After
+ public void teardown() {
+ metrics.unRegister();
+ }
+
@Test
public void chooseNodeWithNoExcludedNodes() throws SCMException {
// test choose new datanodes for new pipeline cases
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 791572f..9838e8b 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -34,7 +34,7 @@ import java.util.stream.Stream;
/**
* Mock PipelineManager implementation for testing.
*/
-public final class MockPipelineManager implements PipelineManager {
+public class MockPipelineManager implements PipelineManager {
private PipelineStateManager stateManager;
@@ -42,7 +42,7 @@ public final class MockPipelineManager implements PipelineManager {
return new MockPipelineManager();
}
- private MockPipelineManager() {
+ MockPipelineManager() {
this.stateManager = new PipelineStateManager();
}
@@ -117,6 +117,19 @@ public final class MockPipelineManager implements PipelineManager {
}
@Override
+ /**
+ * Returns the count of pipelines meeting the given ReplicationConfig and
+ * state.
+ * @param replicationConfig The ReplicationConfig of the pipelines to count
+ * @param state The current state of the pipelines to count
+ * @return The count of pipelines meeting the above criteria
+ */
+ public int getPipelineCount(ReplicationConfig replicationConfig,
+ final Pipeline.PipelineState state) {
+ return stateManager.getPipelineCount(replicationConfig, state);
+ }
+
+ @Override
public void addContainerToPipeline(final PipelineID pipelineID,
final ContainerID containerID)
throws IOException {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
new file mode 100644
index 0000000..ff20f57
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestECPipelineProvider.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.EC;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.ALLOCATED;
+
+/**
+ * Test for the ECPipelineProvider.
+ */
+public class TestECPipelineProvider {
+
+ private PipelineProvider provider;
+ private OzoneConfiguration conf;
+ private NodeManager nodeManager = Mockito.mock(NodeManager.class);
+ private StateManager stateManager = Mockito.mock(StateManager.class);
+ private PlacementPolicy placementPolicy = Mockito.mock(PlacementPolicy.class);
+
+ @Before
+ public void setup() throws IOException {
+ conf = new OzoneConfiguration();
+ provider = new ECPipelineProvider(
+ nodeManager, stateManager, conf, placementPolicy);
+
+ // Placement policy will always return EC number of random nodes.
+ Mockito.when(placementPolicy.chooseDatanodes(Mockito.anyList(),
+ Mockito.anyList(), Mockito.anyInt(), Mockito.anyLong()))
+ .thenAnswer(invocation -> {
+ List<DatanodeDetails> dns = new ArrayList<>();
+ for (int i=0; i<(int)invocation.getArguments()[2]; i++) {
+ dns.add(MockDatanodeDetails.randomDatanodeDetails());
+ }
+ return dns;
+ });
+ }
+
+
+ @Test
+ public void testSimplePipelineCanBeCreatedWithIndexes() throws IOException {
+ ECReplicationConfig ecConf = new ECReplicationConfig(3, 2);
+ Pipeline pipeline = provider.create(ecConf);
+ Assert.assertEquals(EC, pipeline.getType());
+ Assert.assertEquals(ecConf.getData() + ecConf.getParity(),
+ pipeline.getNodes().size());
+ Assert.assertEquals(ALLOCATED, pipeline.getPipelineState());
+ List<DatanodeDetails> dns = pipeline.getNodes();
+ for (int i=0; i<ecConf.getRequiredNodes(); i++) {
+ // EC DN indexes are numbered starting from 1 to N.
+ Assert.assertEquals(i+1, pipeline.getReplicaIndex(dns.get(i)));
+ }
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index a0bb44a..6f1dc5d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -209,11 +209,17 @@ public class TestPipelineManagerImpl {
Assert.assertFalse(pipelineManager
.getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
Pipeline.PipelineState.OPEN).contains(pipeline));
+ Assert.assertEquals(1, pipelineManager
+ .getPipelineCount(new RatisReplicationConfig(ReplicationFactor.THREE),
+ Pipeline.PipelineState.DORMANT));
pipelineManager.activatePipeline(pipeline.getId());
Assert.assertTrue(pipelineManager
.getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
Pipeline.PipelineState.OPEN).contains(pipeline));
+ Assert.assertEquals(1, pipelineManager
+ .getPipelineCount(new RatisReplicationConfig(ReplicationFactor.THREE),
+ Pipeline.PipelineState.OPEN));
buffer.flush();
Assert.assertTrue(pipelineStore.get(pipeline.getId()).isOpen());
pipelineManager.close();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateMap.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateMap.java
new file mode 100644
index 0000000..11ae86c
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineStateMap.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.client.StandaloneReplicationConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for PipelineStateMap.
+ */
+
+public class TestPipelineStateMap {
+
+ private PipelineStateMap map;
+
+ @Before
+ public void setup() {
+ map = new PipelineStateMap();
+ }
+
+ @After
+ public void teardown() throws IOException {
+ }
+
+ @Test
+ public void testCountPipelines() throws IOException {
+ Pipeline p;
+
+ // Open Stanadlone Pipelines
+ map.addPipeline(MockPipeline.createPipeline(1));
+ map.addPipeline(MockPipeline.createPipeline(1));
+ p = MockPipeline.createPipeline(1);
+ map.addPipeline(p);
+ map.updatePipelineState(p.getId(), Pipeline.PipelineState.CLOSED);
+
+ // Ratis pipeline
+ map.addPipeline(MockPipeline.createRatisPipeline());
+ p = MockPipeline.createRatisPipeline();
+ map.addPipeline(p);
+ map.updatePipelineState(p.getId(), Pipeline.PipelineState.CLOSED);
+
+ // EC Pipelines
+ map.addPipeline(MockPipeline.createEcPipeline(
+ new ECReplicationConfig(3, 2)));
+ map.addPipeline(MockPipeline.createEcPipeline(
+ new ECReplicationConfig(3, 2)));
+ p = MockPipeline.createEcPipeline(new ECReplicationConfig(3, 2));
+ map.addPipeline(p);
+ map.updatePipelineState(p.getId(), Pipeline.PipelineState.CLOSED);
+
+ assertEquals(2, map.getPipelineCount(new StandaloneReplicationConfig(ONE),
+ Pipeline.PipelineState.OPEN));
+ assertEquals(1, map.getPipelineCount(new RatisReplicationConfig(THREE),
+ Pipeline.PipelineState.OPEN));
+ assertEquals(2, map.getPipelineCount(new ECReplicationConfig(3, 2),
+ Pipeline.PipelineState.OPEN));
+
+ assertEquals(0, map.getPipelineCount(new ECReplicationConfig(6, 3),
+ Pipeline.PipelineState.OPEN));
+
+ assertEquals(1, map.getPipelineCount(new StandaloneReplicationConfig(ONE),
+ Pipeline.PipelineState.CLOSED));
+ assertEquals(1, map.getPipelineCount(new RatisReplicationConfig(THREE),
+ Pipeline.PipelineState.CLOSED));
+ assertEquals(1, map.getPipelineCount(new ECReplicationConfig(3, 2),
+ Pipeline.PipelineState.CLOSED));
+ }
+
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
new file mode 100644
index 0000000..ceaec5f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestWritableECContainerProvider.java
@@ -0,0 +1,372 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
+import org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.HealthyPipelineChoosePolicy;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.Set;
+
+import static org.apache.hadoop.hdds.conf.StorageUnit.BYTES;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
+import static org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.OPEN;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests to validate the WritableECContainerProvider works correctly.
+ */
+public class TestWritableECContainerProvider {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestWritableECContainerProvider.class);
+ private static final String OWNER = "SCM";
+ private PipelineManager pipelineManager;
+ private ContainerManagerV2 containerManager
+ = Mockito.mock(ContainerManagerV2.class);
+ private PipelineChoosePolicy pipelineChoosingPolicy
+ = new HealthyPipelineChoosePolicy();
+
+ private ConfigurationSource conf;
+ private WritableContainerProvider provider;
+ private ReplicationConfig repConfig;
+ private int minPipelines;
+
+ private Map<ContainerID, ContainerInfo> containers;
+
+ @Before
+ public void setup() throws ContainerNotFoundException {
+ repConfig = new ECReplicationConfig(3, 2);
+ conf = new OzoneConfiguration();
+ WritableECContainerProvider.WritableECContainerProviderConfig providerConf =
+ conf.getObject(WritableECContainerProvider
+ .WritableECContainerProviderConfig.class);
+ minPipelines = providerConf.getMinimumPipelines();
+ containers = new HashMap<>();
+ pipelineManager = MockPipelineManager.getInstance();
+ provider = new WritableECContainerProvider(
+ conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+ Mockito.doAnswer(call -> {
+ Pipeline pipeline = (Pipeline)call.getArguments()[2];
+ ContainerInfo container = createContainer(pipeline,
+ repConfig, System.nanoTime());
+ pipelineManager.addContainerToPipeline(
+ pipeline.getId(), container.containerID());
+ containers.put(container.containerID(), container);
+ return container;
+ }).when(containerManager).getMatchingContainer(Matchers.anyLong(),
+ Matchers.anyString(), Matchers.any(Pipeline.class));
+
+ Mockito.doAnswer(call ->
+ containers.get((ContainerID)call.getArguments()[0]))
+ .when(containerManager).getContainer(Matchers.any(ContainerID.class));
+
+ }
+
+ @Test
+ public void testPipelinesCreatedUpToMinLimitAndRandomPipelineReturned()
+ throws IOException {
+ // The first 5 calls should return a different container
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i=0; i<minPipelines; i++) {
+ ContainerInfo container =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertFalse(allocatedContainers.contains(container));
+ allocatedContainers.add(container);
+ }
+
+ allocatedContainers.clear();
+ for (int i=0; i<20; i++) {
+ ContainerInfo container =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ allocatedContainers.add(container);
+ }
+ // Should have minPipelines containers created
+ assertEquals(minPipelines,
+ pipelineManager.getPipelines(repConfig, OPEN).size());
+ // We should have more than 1 allocatedContainers in the set proving a
+ // random container is selected each time. Do not check for 5 here as there
+ // is a reasonable chance that in 20 turns we don't pick all 5 nodes.
+ assertTrue(allocatedContainers.size() > 2);
+ }
+
+ @Test
+ public void testPiplineLimitIgnoresExcludedPipelines() throws IOException {
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i=0; i<minPipelines; i++) {
+ ContainerInfo container = provider.getContainer(
+ 1, repConfig, OWNER, new ExcludeList());
+ allocatedContainers.add(container);
+ }
+ // We have the min limit of pipelines, but then exclude one. It should use
+ // one of the existing rather than createing a new one, as the limit is
+ // checked against all pipelines, not just the filtered list
+ ExcludeList exclude = new ExcludeList();
+ PipelineID excludedID = allocatedContainers
+ .stream().findFirst().get().getPipelineID();
+ exclude.addPipeline(excludedID);
+
+ ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
+ assertNotEquals(excludedID, c.getPipelineID());
+ assertTrue(allocatedContainers.contains(c));
+ }
+
+ @Test
+ public void testNewPipelineCreatedIfAllPipelinesExcluded()
+ throws IOException {
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i=0; i<minPipelines; i++) {
+ ContainerInfo container = provider.getContainer(
+ 1, repConfig, OWNER, new ExcludeList());
+ allocatedContainers.add(container);
+ }
+ // We have the min limit of pipelines, but then exclude one. It should use
+ // one of the existing rather than createing a new one, as the limit is
+ // checked against all pipelines, not just the filtered list
+ ExcludeList exclude = new ExcludeList();
+ for (ContainerInfo c : allocatedContainers) {
+ exclude.addPipeline(c.getPipelineID());
+ }
+ ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
+ assertFalse(allocatedContainers.contains(c));
+ }
+
+ @Test
+ public void testNewPipelineCreatedIfAllContainersExcluded()
+ throws IOException {
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i=0; i<minPipelines; i++) {
+ ContainerInfo container = provider.getContainer(
+ 1, repConfig, OWNER, new ExcludeList());
+ allocatedContainers.add(container);
+ }
+ // We have the min limit of pipelines, but then exclude one. It should use
+ // one of the existing rather than createing a new one, as the limit is
+ // checked against all pipelines, not just the filtered list
+ ExcludeList exclude = new ExcludeList();
+ for (ContainerInfo c : allocatedContainers) {
+ exclude.addConatinerId(c.containerID());
+ }
+ ContainerInfo c = provider.getContainer(1, repConfig, OWNER, exclude);
+ assertFalse(allocatedContainers.contains(c));
+ }
+
+ @Test
+ public void testUnableToCreateAnyPipelinesReturnsNull() throws IOException {
+ pipelineManager = new MockPipelineManager() {
+ @Override
+ public Pipeline createPipeline(ReplicationConfig repConf)
+ throws IOException {
+ throw new IOException("Cannot create pipelines");
+ }
+ };
+ provider = new WritableECContainerProvider(
+ conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+ ContainerInfo container =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertNull(container);
+ }
+
+ @Test
+ public void testExistingPipelineReturnedWhenNewCannotBeCreated()
+ throws IOException {
+ pipelineManager = new MockPipelineManager() {
+
+ private boolean throwError = false;
+
+ @Override
+ public Pipeline createPipeline(ReplicationConfig repConf)
+ throws IOException {
+ if (throwError) {
+ throw new IOException("Cannot create pipelines");
+ }
+ throwError = true;
+ return super.createPipeline(repConfig);
+ }
+ };
+ provider = new WritableECContainerProvider(
+ conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+ ContainerInfo container =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ for (int i=0; i<5; i++) {
+ ContainerInfo nextContainer =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertEquals(container, nextContainer);
+ }
+ }
+
+ @Test
+ public void testNewContainerAllocatedAndPipelinesClosedIfNoSpaceInExisting()
+ throws IOException {
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i=0; i<minPipelines; i++) {
+ ContainerInfo container =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertFalse(allocatedContainers.contains(container));
+ allocatedContainers.add(container);
+ }
+ // Update all the containers to make them full
+ for (ContainerInfo c : allocatedContainers) {
+ c.setUsedBytes(getMaxContainerSize());
+ }
+ // Get a new container and ensure it is not one of the original set
+ ContainerInfo newContainer =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertNotNull(newContainer);
+ assertFalse(allocatedContainers.contains(newContainer));
+ // The original pipelines should all be closed
+ for (ContainerInfo c : allocatedContainers) {
+ Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
+ assertEquals(CLOSED, pipeline.getPipelineState());
+ }
+ }
+
+ @Test
+ public void testPipelineNotFoundWhenAttemptingToUseExisting()
+ throws IOException {
+ // Ensure PM throws PNF exception when we ask for the containers in the
+ // pipeline
+ pipelineManager = new MockPipelineManager() {
+
+ @Override
+ public NavigableSet<ContainerID> getContainersInPipeline(
+ PipelineID pipelineID) throws IOException {
+ throw new PipelineNotFoundException("Simulated exception");
+ }
+ };
+ provider = new WritableECContainerProvider(
+ conf, pipelineManager, containerManager, pipelineChoosingPolicy);
+
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i=0; i<minPipelines; i++) {
+ ContainerInfo container =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertFalse(allocatedContainers.contains(container));
+ allocatedContainers.add(container);
+ }
+ // Now attempt to get a container - any attempt to use an existing with
+ // throw PNF and then we must allocate a new one
+ ContainerInfo newContainer =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertNotNull(newContainer);
+ assertFalse(allocatedContainers.contains(newContainer));
+ }
+
+ @Test
+ public void testContainerNotFoundWhenAttemptingToUseExisting()
+ throws IOException {
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i=0; i<minPipelines; i++) {
+ ContainerInfo container =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertFalse(allocatedContainers.contains(container));
+ allocatedContainers.add(container);
+ }
+
+ // Ensure ContainerManager always throws when a container is requested so
+ // existing pipelines cannot be used
+ Mockito.doAnswer(call -> {
+ throw new ContainerNotFoundException();
+ }).when(containerManager).getContainer(Matchers.any(ContainerID.class));
+
+ ContainerInfo newContainer =
+ provider.getContainer(1, repConfig, OWNER, new ExcludeList());
+ assertNotNull(newContainer);
+ assertFalse(allocatedContainers.contains(newContainer));
+
+ // Ensure all the existing pipelines are closed
+ for (ContainerInfo c : allocatedContainers) {
+ Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
+ assertEquals(CLOSED, pipeline.getPipelineState());
+ }
+ }
+
+ @Test
+ public void testPipelineOpenButContainerRemovedFromIt() throws IOException {
+ // This can happen if the container close process is triggered from the DN.
+ // When tha happens, CM will change the container state to CLOSING and
+ // remove it from the container list in pipeline Manager.
+ Set<ContainerInfo> allocatedContainers = new HashSet<>();
+ for (int i=0; i<minPipelines; i++) {
+ ContainerInfo container = provider.getContainer(
+ 1, repConfig, OWNER, new ExcludeList());
+ assertFalse(allocatedContainers.contains(container));
+ allocatedContainers.add(container);
+ // Remove the container from the pipeline to simulate closing it
+ pipelineManager.removeContainerFromPipeline(
+ container.getPipelineID(), container.containerID());
+ }
+ ContainerInfo newContainer = provider.getContainer(
+ 1, repConfig, OWNER, new ExcludeList());
+ assertFalse(allocatedContainers.contains(newContainer));
+ for (ContainerInfo c : allocatedContainers) {
+ Pipeline pipeline = pipelineManager.getPipeline(c.getPipelineID());
+ assertEquals(CLOSED, pipeline.getPipelineState());
+ }
+ }
+
+ private ContainerInfo createContainer(Pipeline pipeline,
+ ReplicationConfig repConf, long containerID) {
+ return new ContainerInfo.Builder()
+ .setContainerID(containerID)
+ .setOwner(OWNER)
+ .setReplicationConfig(repConf)
+ .setState(HddsProtos.LifeCycleState.OPEN)
+ .setPipelineID(pipeline.getId())
+ .setNumberOfKeys(0)
+ .setUsedBytes(0)
+ .setSequenceId(0)
+ .setDeleteTransactionId(0)
+ .build();
+ }
+
+ private long getMaxContainerSize() {
+ return (long)conf.getStorageSize(
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+ ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, BYTES);
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org