You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2019/11/25 07:16:51 UTC
[hadoop-ozone] 04/04: HDDS-1569 Support creating multiple pipelines
with same datanode. Contributed by Li Cheng.
This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 32af34f1946705c8260e3e07f439e03ef020733d
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Tue Oct 29 12:46:00 2019 +0800
HDDS-1569 Support creating multiple pipelines with same datanode. Contributed by Li Cheng.
This closes #28
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 10 +-
.../common/src/main/resources/ozone-default.xml | 15 +-
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 4 +
.../ContainerPlacementPolicyFactory.java | 8 +-
.../hdds/scm/node/states/Node2PipelineMap.java | 2 +-
.../scm/pipeline/BackgroundPipelineCreator.java | 1 +
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 89 +++++++---
.../hadoop/hdds/scm/pipeline/PipelineStateMap.java | 5 +-
.../hdds/scm/pipeline/RatisPipelineProvider.java | 136 +++++++++------
.../hdds/scm/pipeline/RatisPipelineUtils.java | 4 +-
.../hdds/scm/pipeline/SCMPipelineManager.java | 13 +-
.../hdds/scm/pipeline/SCMPipelineMetrics.java | 8 +
.../scm/safemode/HealthyPipelineSafeModeRule.java | 34 ++--
.../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 3 +
.../scm/pipeline/TestPipelinePlacementPolicy.java | 15 +-
.../hdds/scm/pipeline/TestPipelineClose.java | 2 +-
.../TestRatisPipelineCreateAndDestroy.java | 24 ++-
.../scm/pipeline/TestRatisPipelineProvider.java | 184 +++++++++++++++++++++
.../hdds/scm/pipeline/TestSCMPipelineManager.java | 17 +-
.../hadoop/hdds/scm/pipeline/TestSCMRestart.java | 5 +-
.../safemode/TestSCMSafeModeWithPipelineRules.java | 3 +
.../org/apache/hadoop/ozone/MiniOzoneCluster.java | 12 ++
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 6 +-
.../ozone/client/rpc/Test2WayCommitInRatis.java | 1 +
.../ozone/client/rpc/TestBlockOutputStream.java | 1 +
.../rpc/TestBlockOutputStreamWithFailures.java | 7 +-
.../hadoop/ozone/client/rpc/TestCommitWatcher.java | 1 +
.../rpc/TestContainerReplicationEndToEnd.java | 5 +-
.../client/rpc/TestContainerStateMachine.java | 5 +-
.../client/rpc/TestDeleteWithSlowFollower.java | 12 +-
.../client/rpc/TestFailureHandlingByClient.java | 4 +-
.../client/rpc/TestHybridPipelineOnDatanode.java | 3 +-
.../ozone/client/rpc/TestKeyInputStream.java | 1 +
.../rpc/TestMultiBlockWritesWithDnFailures.java | 8 +-
.../rpc/TestOzoneClientRetriesOnException.java | 1 +
.../client/rpc/TestOzoneRpcClientAbstract.java | 1 +
.../ozone/client/rpc/TestWatchForCommit.java | 3 +
.../TestCloseContainerByPipeline.java | 5 +
.../TestSCMContainerPlacementPolicyMetrics.java | 1 +
.../hadoop/ozone/scm/node/TestQueryNode.java | 3 +
.../apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java | 3 +
.../hadoop/ozone/freon/TestDataValidate.java | 2 +-
.../ozone/freon/TestFreonWithPipelineDestroy.java | 1 +
43 files changed, 518 insertions(+), 150 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index e6fed5b..17e09c1 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -319,7 +319,15 @@ public final class ScmConfigKeys {
// the max number of pipelines can a single datanode be engaged in.
public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT =
"ozone.scm.datanode.max.pipeline.engagement";
- public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 5;
+ // Setting to zero by default means this limit doesn't take effect.
+ public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 0;
+
+ // Upper limit for how many pipelines can be created.
+ // Only for test purpose now.
+ public static final String OZONE_SCM_PIPELINE_NUMBER_LIMIT =
+ "ozone.scm.pipeline.number.limit";
+ // Setting to zero by default means this limit doesn't take effect.
+ public static final int OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT = 0;
public static final String
OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 2ad9948..909c692 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -841,10 +841,19 @@
</description>
</property>
<property>
- <name>ozone.scm.datanode.max.pipeline.engagement</name>
- <value>5</value>
+ <name>ozone.scm.datanode.max.pipeline.engagement</name>
+ <value>0</value>
+ <tag>OZONE, SCM, PIPELINE</tag>
+ <description>Max number of pipelines per datanode can be engaged in.
+ </description>
+ </property>
+ <property>
+ <name>ozone.scm.pipeline.number.limit</name>
+ <value>0</value>
<tag>OZONE, SCM, PIPELINE</tag>
- <description>Max number of pipelines per datanode can be engaged in.
+ <description>Upper limit for how many pipelines can be OPEN in SCM.
+ 0 as default means there is no limit. Otherwise, the number is the limit
+ of max amount of pipelines which are OPEN.
</description>
</property>
<property>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 845bdf1..00ad58a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -196,6 +196,10 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// TODO: #CLUTIL Remove creation logic when all replication types and
// factors are handled by pipeline creator
pipeline = pipelineManager.createPipeline(type, factor);
+ } catch (SCMException se) {
+ LOG.warn("Pipeline creation failed for type:{} factor:{}. " +
+ "Datanodes may be used up.", type, factor, se);
+ break;
} catch (IOException e) {
LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
"get pipelines call once.", type, factor, e);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
index adaeb87..74431f9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
@@ -43,10 +43,10 @@ public final class ContainerPlacementPolicyFactory {
}
- public static PlacementPolicy getPolicy(Configuration conf,
- final NodeManager nodeManager, NetworkTopology clusterMap,
- final boolean fallback, SCMContainerPlacementMetrics metrics)
- throws SCMException{
+ public static PlacementPolicy getPolicy(
+ Configuration conf, final NodeManager nodeManager,
+ NetworkTopology clusterMap, final boolean fallback,
+ SCMContainerPlacementMetrics metrics) throws SCMException{
final Class<? extends PlacementPolicy> placementClass = conf
.getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
index 714188d..18809ed 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
@@ -80,7 +80,7 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
dn2ObjectMap.computeIfPresent(dnId,
(k, v) -> {
v.remove(pipeline.getId());
- return v;
+ return v.isEmpty() ? null : v;
});
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 6873566..6952f74 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -96,6 +96,7 @@ class BackgroundPipelineCreator {
if (scheduler.isClosed()) {
break;
}
+
pipelineManager.createPipeline(type, factor);
} catch (IOException ioe) {
break;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 1983ed6..23eb574 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -52,6 +53,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
static final Logger LOG =
LoggerFactory.getLogger(PipelinePlacementPolicy.class);
private final NodeManager nodeManager;
+ private final PipelineStateManager stateManager;
private final Configuration conf;
private final int heavyNodeCriteria;
@@ -59,15 +61,17 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
* Constructs a pipeline placement with considering network topology,
* load balancing and rack awareness.
*
- * @param nodeManager Node Manager
+ * @param nodeManager NodeManager
+ * @param stateManager PipelineStateManager
* @param conf Configuration
*/
- public PipelinePlacementPolicy(
- final NodeManager nodeManager, final Configuration conf) {
+ public PipelinePlacementPolicy(final NodeManager nodeManager,
+ final PipelineStateManager stateManager, final Configuration conf) {
super(nodeManager, conf);
this.nodeManager = nodeManager;
this.conf = conf;
- heavyNodeCriteria = conf.getInt(
+ this.stateManager = stateManager;
+ this.heavyNodeCriteria = conf.getInt(
ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
}
@@ -76,11 +80,46 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
* Returns true if this node meets the criteria.
*
* @param datanodeDetails DatanodeDetails
+ * @param nodesRequired nodes required count
* @return true if we have enough space.
*/
@VisibleForTesting
- boolean meetCriteria(DatanodeDetails datanodeDetails, long heavyNodeLimit) {
- return (nodeManager.getPipelinesCount(datanodeDetails) <= heavyNodeLimit);
+ boolean meetCriteria(DatanodeDetails datanodeDetails, int nodesRequired) {
+ if (heavyNodeCriteria == 0) {
+ // no limit applied.
+ return true;
+ }
+ // Datanodes from pipeline in some states can also be considered available
+ // for pipeline allocation. Thus the number of these pipeline shall be
+ // deducted from total heaviness calculation.
+ int pipelineNumDeductable = 0;
+ Set<PipelineID> pipelines = nodeManager.getPipelines(datanodeDetails);
+ for (PipelineID pid : pipelines) {
+ Pipeline pipeline;
+ try {
+ pipeline = stateManager.getPipeline(pid);
+ } catch (PipelineNotFoundException e) {
+ LOG.error("Pipeline not found in pipeline state manager during" +
+ " pipeline creation. PipelineID: " + pid +
+ " exception: " + e.getMessage());
+ continue;
+ }
+ if (pipeline != null &&
+ pipeline.getFactor().getNumber() == nodesRequired &&
+ pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
+ pipeline.getPipelineState() == Pipeline.PipelineState.CLOSED) {
+ pipelineNumDeductable++;
+ }
+ }
+ boolean meet = (nodeManager.getPipelinesCount(datanodeDetails)
+ - pipelineNumDeductable) < heavyNodeCriteria;
+ if (!meet) {
+ LOG.info("Pipeline Placement: can't place more pipeline on heavy " +
+ "datanodeļ¼ " + datanodeDetails.getUuid().toString() + " Heaviness: " +
+ nodeManager.getPipelinesCount(datanodeDetails) + " limit: " +
+ heavyNodeCriteria);
+ }
+ return meet;
}
/**
@@ -102,18 +141,19 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
if (excludedNodes != null) {
healthyNodes.removeAll(excludedNodes);
}
+ int initialHealthyNodesCount = healthyNodes.size();
String msg;
- if (healthyNodes.size() == 0) {
+ if (initialHealthyNodesCount == 0) {
msg = "No healthy node found to allocate pipeline.";
LOG.error(msg);
throw new SCMException(msg, SCMException.ResultCodes
.FAILED_TO_FIND_HEALTHY_NODES);
}
- if (healthyNodes.size() < nodesRequired) {
+ if (initialHealthyNodesCount < nodesRequired) {
msg = String.format("Not enough healthy nodes to allocate pipeline. %d "
+ " datanodes required. Found %d",
- nodesRequired, healthyNodes.size());
+ nodesRequired, initialHealthyNodesCount);
LOG.error(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -121,14 +161,17 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
// filter nodes that meet the size and pipeline engagement criteria.
// Pipeline placement doesn't take node space left into account.
- List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d ->
- meetCriteria(d, heavyNodeCriteria)).collect(Collectors.toList());
+ List<DatanodeDetails> healthyList = healthyNodes.stream()
+ .filter(d -> meetCriteria(d, nodesRequired)).limit(nodesRequired)
+ .collect(Collectors.toList());
if (healthyList.size() < nodesRequired) {
msg = String.format("Unable to find enough nodes that meet " +
"the criteria that cannot engage in more than %d pipelines." +
- " Nodes required: %d Found: %d",
- heavyNodeCriteria, nodesRequired, healthyList.size());
+ " Nodes required: %d Found: %d, healthy nodes count in " +
+ "NodeManager: %d.",
+ heavyNodeCriteria, nodesRequired, healthyList.size(),
+ initialHealthyNodesCount);
LOG.error(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -154,13 +197,11 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
// and make sure excludedNodes are excluded from list.
List<DatanodeDetails> healthyNodes =
filterViableNodes(excludedNodes, nodesRequired);
-
- // Randomly picks nodes when all nodes are equal.
+
+ // Randomly picks nodes when all nodes are equal or factor is ONE.
// This happens when network topology is absent or
// all nodes are on the same rack.
if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
- LOG.info("All nodes are considered equal. Now randomly pick nodes. " +
- "Required nodes: {}", nodesRequired);
return super.getResultSet(nodesRequired, healthyNodes);
} else {
// Since topology and rack awareness are available, picks nodes
@@ -188,8 +229,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
// First choose an anchor nodes randomly
DatanodeDetails anchor = chooseNode(healthyNodes);
if (anchor == null) {
- LOG.error("Unable to find the first healthy nodes that " +
- "meet the criteria. Required nodes: {}, Found nodes: {}",
+ LOG.error("Pipeline Placement: Unable to find the first healthy nodes " +
+ "that meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -204,8 +245,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
healthyNodes, exclude,
nodeManager.getClusterNetworkTopologyMap(), anchor);
if (nodeOnDifferentRack == null) {
- LOG.error("Unable to find nodes on different racks that " +
- "meet the criteria. Required nodes: {}, Found nodes: {}",
+ LOG.error("Pipeline Placement: Unable to find nodes on different racks " +
+ " that meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -228,9 +269,9 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
}
if (results.size() < nodesRequired) {
- LOG.error("Unable to find the required number of healthy nodes that " +
- "meet the criteria. Required nodes: {}, Found nodes: {}",
- nodesRequired, results.size());
+ LOG.error("Pipeline Placement: Unable to find the required number of " +
+ "healthy nodes that meet the criteria. Required nodes: {}, " +
+ "Found nodes: {}", nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index 443378c..8e0f32d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -52,8 +53,8 @@ class PipelineStateMap {
PipelineStateMap() {
// TODO: Use TreeMap for range operations?
- pipelineMap = new HashMap<>();
- pipeline2container = new HashMap<>();
+ pipelineMap = new ConcurrentHashMap<>();
+ pipeline2container = new ConcurrentHashMap<>();
query2OpenPipelines = new HashMap<>();
initializeQueryMap();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 913a435..216cb68 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -20,13 +20,12 @@ package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.PlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
import org.apache.hadoop.io.MultipleIOException;
@@ -44,8 +43,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -69,6 +66,9 @@ public class RatisPipelineProvider implements PipelineProvider {
private final NodeManager nodeManager;
private final PipelineStateManager stateManager;
private final Configuration conf;
+ private final PipelinePlacementPolicy placementPolicy;
+ private int pipelineNumberLimit;
+ private int maxPipelinePerDatanode;
// Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
private final int parallelismForPool = 3;
@@ -92,65 +92,93 @@ public class RatisPipelineProvider implements PipelineProvider {
this.stateManager = stateManager;
this.conf = conf;
this.tlsConfig = tlsConfig;
+ this.placementPolicy =
+ new PipelinePlacementPolicy(nodeManager, stateManager, conf);
+ this.pipelineNumberLimit = conf.getInt(
+ ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT);
+ this.maxPipelinePerDatanode = conf.getInt(
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
}
+ private List<DatanodeDetails> pickNodesNeverUsed(ReplicationFactor factor)
+ throws SCMException {
+ Set<DatanodeDetails> dnsUsed = new HashSet<>();
+ stateManager.getPipelines(ReplicationType.RATIS, factor)
+ .stream().filter(
+ p -> p.getPipelineState().equals(PipelineState.OPEN) ||
+ p.getPipelineState().equals(PipelineState.DORMANT) ||
+ p.getPipelineState().equals(PipelineState.ALLOCATED))
+ .forEach(p -> dnsUsed.addAll(p.getNodes()));
- /**
- * Create pluggable container placement policy implementation instance.
- *
- * @param nodeManager - SCM node manager.
- * @param conf - configuration.
- * @return SCM container placement policy implementation instance.
- */
- @SuppressWarnings("unchecked")
- // TODO: should we rename PlacementPolicy to PipelinePlacementPolicy?
- private static PlacementPolicy createContainerPlacementPolicy(
- final NodeManager nodeManager, final Configuration conf) {
- Class<? extends PlacementPolicy> implClass =
- (Class<? extends PlacementPolicy>) conf.getClass(
- ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
- SCMContainerPlacementRandom.class);
+ // Get list of healthy nodes
+ List<DatanodeDetails> dns = nodeManager
+ .getNodes(HddsProtos.NodeState.HEALTHY)
+ .parallelStream()
+ .filter(dn -> !dnsUsed.contains(dn))
+ .limit(factor.getNumber())
+ .collect(Collectors.toList());
+ if (dns.size() < factor.getNumber()) {
+ String e = String
+ .format("Cannot create pipeline of factor %d using %d nodes." +
+ " Used %d nodes. Healthy nodes %d", factor.getNumber(),
+ dns.size(), dnsUsed.size(),
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size());
+ throw new SCMException(e,
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+ return dns;
+ }
- try {
- Constructor<? extends PlacementPolicy> ctor =
- implClass.getDeclaredConstructor(NodeManager.class,
- Configuration.class);
- return ctor.newInstance(nodeManager, conf);
- } catch (RuntimeException e) {
- throw e;
- } catch (InvocationTargetException e) {
- throw new RuntimeException(implClass.getName()
- + " could not be constructed.", e.getCause());
- } catch (Exception e) {
-// LOG.error("Unhandled exception occurred, Placement policy will not " +
-// "be functional.");
- throw new IllegalArgumentException("Unable to load " +
- "PlacementPolicy", e);
+ private boolean exceedPipelineNumberLimit(ReplicationFactor factor) {
+ if (factor != ReplicationFactor.THREE) {
+ // Only put limits for Factor THREE pipelines.
+ return false;
+ }
+ // Per datanode limit
+ if (maxPipelinePerDatanode > 0) {
+ return (stateManager.getPipelines(ReplicationType.RATIS, factor).size() -
+ stateManager.getPipelines(ReplicationType.RATIS, factor,
+ Pipeline.PipelineState.CLOSED).size()) > maxPipelinePerDatanode *
+ nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY) /
+ factor.getNumber();
}
+
+ // Global limit
+ if (pipelineNumberLimit > 0) {
+ return (stateManager.getPipelines(ReplicationType.RATIS,
+ ReplicationFactor.THREE).size() - stateManager.getPipelines(
+ ReplicationType.RATIS, ReplicationFactor.THREE,
+ Pipeline.PipelineState.CLOSED).size()) >
+ (pipelineNumberLimit - stateManager.getPipelines(
+ ReplicationType.RATIS, ReplicationFactor.ONE).size());
+ }
+
+ return false;
}
@Override
public Pipeline create(ReplicationFactor factor) throws IOException {
- // Get set of datanodes already used for ratis pipeline
- Set<DatanodeDetails> dnsUsed = new HashSet<>();
- stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter(
- p -> p.getPipelineState().equals(PipelineState.OPEN) ||
- p.getPipelineState().equals(PipelineState.DORMANT) ||
- p.getPipelineState().equals(PipelineState.ALLOCATED))
- .forEach(p -> dnsUsed.addAll(p.getNodes()));
+ if (exceedPipelineNumberLimit(factor)) {
+ throw new SCMException("Ratis pipeline number meets the limit: " +
+ pipelineNumberLimit + " factor : " +
+ factor.getNumber(),
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
- // Get list of healthy nodes
- List<DatanodeDetails> dns =
- nodeManager.getNodes(NodeState.HEALTHY)
- .parallelStream()
- .filter(dn -> !dnsUsed.contains(dn))
- .limit(factor.getNumber())
- .collect(Collectors.toList());
- if (dns.size() < factor.getNumber()) {
- String e = String
- .format("Cannot create pipeline of factor %d using %d nodes.",
- factor.getNumber(), dns.size());
- throw new InsufficientDatanodesException(e);
+ List<DatanodeDetails> dns;
+
+ switch(factor) {
+ case ONE:
+ dns = pickNodesNeverUsed(ReplicationFactor.ONE);
+ break;
+ case THREE:
+ dns = placementPolicy.chooseDatanodes(null,
+ null, factor.getNumber(), 0);
+ break;
+ default:
+ throw new IllegalStateException("Unknown factor: " + factor.name());
}
Pipeline pipeline = create(factor, dns);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
index 497e717..04393a1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -66,8 +66,8 @@ public final class RatisPipelineUtils {
try {
destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
} catch (IOException e) {
- LOG.warn("Pipeline destroy failed for pipeline={} dn={}",
- pipeline.getId(), dn);
+ LOG.warn("Pipeline destroy failed for pipeline={} dn={} exception={}",
+ pipeline.getId(), dn, e.getMessage());
}
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 0964f6d..b41c595 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -54,10 +54,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import static org.apache.hadoop.hdds.scm
- .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm
- .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
/**
@@ -97,8 +93,8 @@ public class SCMPipelineManager implements PipelineManager {
scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
this.backgroundPipelineCreator =
new BackgroundPipelineCreator(this, scheduler, conf);
- int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
- OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+ int cacheSize = conf.getInt(ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB,
+ ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
final File metaDir = ServerUtils.getScmDbDir(conf);
final File pipelineDBPath = new File(metaDir, SCM_PIPELINE_DB);
this.pipelineStore =
@@ -160,10 +156,9 @@ public class SCMPipelineManager implements PipelineManager {
metrics.incNumPipelineCreated();
metrics.createPerPipelineMetrics(pipeline);
return pipeline;
- } catch (InsufficientDatanodesException idEx) {
- throw idEx;
} catch (IOException ex) {
metrics.incNumPipelineCreationFailed();
+ LOG.error("Pipeline creation failed.", ex);
throw ex;
} finally {
lock.writeLock().unlock();
@@ -172,7 +167,7 @@ public class SCMPipelineManager implements PipelineManager {
@Override
public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
- List<DatanodeDetails> nodes) {
+ List<DatanodeDetails> nodes) {
// This will mostly be used to create dummy pipeline for SimplePipelines.
// We don't update the metrics for SimplePipelines.
lock.writeLock().lock();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index b6a1445..9427391 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -125,6 +125,14 @@ public final class SCMPipelineMetrics implements MetricsSource {
}
/**
+ * Get the number of pipeline created.
+ * @return number of pipeline
+ */
+ long getNumPipelineCreated() {
+ return numPipelineCreated.value();
+ }
+
+ /**
* Increments number of failed pipeline creation count.
*/
void incNumPipelineCreationFailed() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 2f9a66f..3b31454 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -17,8 +17,8 @@
*/
package org.apache.hadoop.hdds.scm.safemode;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
@@ -54,8 +54,9 @@ public class HealthyPipelineSafeModeRule
private final PipelineManager pipelineManager;
private int healthyPipelineThresholdCount;
private int currentHealthyPipelineCount = 0;
- private final Map<PipelineID, Boolean> processedPipelines = new HashMap<>();
private final double healthyPipelinesPercent;
+ private final Set<PipelineID> processedPipelineIDs =
+ new HashSet<>();
HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
PipelineManager pipelineManager,
@@ -121,29 +122,34 @@ public class HealthyPipelineSafeModeRule
// from datanode again during threshold calculation.
Preconditions.checkNotNull(pipelineReportFromDatanode);
+ Pipeline pipeline;
PipelineReportsProto pipelineReport =
pipelineReportFromDatanode.getReport();
for (PipelineReport report : pipelineReport.getPipelineReportList()) {
- PipelineID pipelineID = PipelineID.getFromProtobuf(
- report.getPipelineID());
- Pipeline pipeline;
+ PipelineID pipelineID = PipelineID
+ .getFromProtobuf(report.getPipelineID());
+ if (processedPipelineIDs.contains(pipelineID)) {
+ continue;
+ }
+
try {
pipeline = pipelineManager.getPipeline(pipelineID);
} catch (PipelineNotFoundException e) {
continue;
}
- if (!processedPipelines.containsKey(pipelineID)) {
- if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
+ if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
report.getIsLeader()) {
- // If the pipeline gets reported with a leader we mark it as healthy
- currentHealthyPipelineCount++;
- getSafeModeMetrics().incCurrentHealthyPipelinesCount();
- processedPipelines.put(pipelineID, Boolean.TRUE);
- }
+ // If the pipeline gets reported with a leader we mark it as healthy
+ // for this pipeline.
+ currentHealthyPipelineCount++;
+ getSafeModeMetrics().incCurrentHealthyPipelinesCount();
+ processedPipelineIDs.add(pipelineID);
}
+
}
+
if (scmInSafeMode()) {
SCMSafeModeManager.getLogger().info(
"SCM in safe mode. Healthy pipelines reported count is {}, " +
@@ -154,7 +160,7 @@ public class HealthyPipelineSafeModeRule
@Override
protected void cleanup() {
- processedPipelines.clear();
+ processedPipelineIDs.clear();
}
@VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index c140119..7e88043 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -65,6 +65,8 @@ import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test DeadNodeHandler.
*/
@@ -85,6 +87,7 @@ public class TestDeadNodeHandler {
storageDir = GenericTestUtils.getTempPath(
TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0);
eventQueue = new EventQueue();
scm = HddsTestUtils.getScm(conf);
nodeManager = (SCMNodeManager) scm.getScmNodeManager();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index 2e0d0b1..1e34039 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -34,11 +34,14 @@ import org.junit.Test;
import java.util.*;
import java.util.stream.Collectors;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test for PipelinePlacementPolicy.
*/
public class TestPipelinePlacementPolicy {
private MockNodeManager nodeManager;
+ private OzoneConfiguration conf;
private PipelinePlacementPolicy placementPolicy;
private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
@@ -46,8 +49,10 @@ public class TestPipelinePlacementPolicy {
public void init() throws Exception {
nodeManager = new MockNodeManager(true,
PIPELINE_PLACEMENT_MAX_NODES_COUNT);
- placementPolicy =
- new PipelinePlacementPolicy(nodeManager, new OzoneConfiguration());
+ conf = new OzoneConfiguration();
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
+ placementPolicy = new PipelinePlacementPolicy(
+ nodeManager, new PipelineStateManager(conf), conf);
}
@Test
@@ -123,7 +128,7 @@ public class TestPipelinePlacementPolicy {
public void testHeavyNodeShouldBeExcluded() throws SCMException{
List<DatanodeDetails> healthyNodes =
nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
- int nodesRequired = healthyNodes.size()/2;
+ int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
// only minority of healthy NODES are heavily engaged in pipelines.
int minorityHeavy = healthyNodes.size()/2 - 1;
List<DatanodeDetails> pickedNodes1 = placementPolicy.chooseDatanodes(
@@ -179,7 +184,9 @@ public class TestPipelinePlacementPolicy {
}
int considerHeavyCount =
- ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT + 1;
+ conf.getInt(
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT) + 1;
Node2PipelineMap mockMap = new Node2PipelineMap();
for (DatanodeDetails node : nodes) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index c583559..9bccb1a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -169,7 +169,7 @@ public class TestPipelineClose {
new PipelineActionHandler(pipelineManager, conf);
pipelineActionHandler
.onMessage(pipelineActionsFromDatanode, new EventQueue());
- Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 1.2));
+ Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 10));
OzoneContainer ozoneContainer =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
index 6ace90c..cbe450e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -34,6 +35,7 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
@@ -48,9 +50,12 @@ public class TestRatisPipelineCreateAndDestroy {
public void init(int numDatanodes) throws Exception {
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
GenericTestUtils.getRandomizedTempPath());
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
+
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numDatanodes)
- .setHbInterval(1000)
+ .setTotalPipelineNumLimit(numDatanodes + numDatanodes/3)
+ .setHbInterval(2000)
.setHbProcessorInterval(1000)
.build();
cluster.waitForClusterToBeReady();
@@ -103,7 +108,9 @@ public class TestRatisPipelineCreateAndDestroy {
} catch (IOException ioe) {
// As now all datanodes are shutdown, they move to stale state, there
// will be no sufficient datanodes to create the pipeline.
- Assert.assertTrue(ioe instanceof InsufficientDatanodesException);
+ Assert.assertTrue(ioe instanceof SCMException);
+ Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
+ ((SCMException) ioe).getResult());
}
// make sure pipelines is destroyed
@@ -116,9 +123,14 @@ public class TestRatisPipelineCreateAndDestroy {
for (Pipeline pipeline : pipelines) {
pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
}
- // make sure pipelines is created after node start
- pipelineManager.triggerPipelineCreation();
- waitForPipelines(1);
+
+ if (cluster.getStorageContainerManager()
+ .getScmNodeManager().getNodeCount(HddsProtos.NodeState.HEALTHY) >=
+ HddsProtos.ReplicationFactor.THREE.getNumber()) {
+ // make sure pipelines is created after node start
+ pipelineManager.triggerPipelineCreation();
+ waitForPipelines(1);
+ }
}
private void waitForPipelines(int numPipelines)
@@ -126,6 +138,6 @@ public class TestRatisPipelineCreateAndDestroy {
GenericTestUtils.waitFor(() -> pipelineManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
- .size() == numPipelines, 100, 40000);
+ .size() >= numPipelines, 100, 40000);
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
new file mode 100644
index 0000000..7862605
--- /dev/null
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
+/**
+ * Test for RatisPipelineProvider.
+ */
+public class TestRatisPipelineProvider {
+
+ private NodeManager nodeManager;
+ private PipelineProvider provider;
+ private PipelineStateManager stateManager;
+
+ @Before
+ public void init() throws Exception {
+ nodeManager = new MockNodeManager(true, 10);
+ stateManager = new PipelineStateManager(new OzoneConfiguration());
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
+ provider = new MockRatisPipelineProvider(nodeManager,
+ stateManager, conf);
+ }
+
+ private void createPipelineAndAssertions(
+ HddsProtos.ReplicationFactor factor) throws IOException {
+ Pipeline pipeline = provider.create(factor);
+ stateManager.addPipeline(pipeline);
+ nodeManager.addPipeline(pipeline);
+ Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+ Assert.assertEquals(pipeline.getFactor(), factor);
+ Assert.assertEquals(pipeline.getPipelineState(),
+ Pipeline.PipelineState.OPEN);
+ Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+ Pipeline pipeline1 = provider.create(factor);
+ stateManager.addPipeline(pipeline1);
+ nodeManager.addPipeline(pipeline1);
+ Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
+ Assert.assertEquals(pipeline1.getFactor(), factor);
+ Assert.assertEquals(pipeline1.getPipelineState(),
+ Pipeline.PipelineState.OPEN);
+ Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
+ }
+
+ @Test
+ public void testCreatePipelineWithFactor() throws IOException {
+ HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+ Pipeline pipeline = provider.create(factor);
+ stateManager.addPipeline(pipeline);
+ nodeManager.addPipeline(pipeline);
+ Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+ Assert.assertEquals(pipeline.getFactor(), factor);
+ Assert.assertEquals(pipeline.getPipelineState(),
+ Pipeline.PipelineState.OPEN);
+ Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+
+ factor = HddsProtos.ReplicationFactor.ONE;
+ Pipeline pipeline1 = provider.create(factor);
+ stateManager.addPipeline(pipeline1);
+ nodeManager.addPipeline(pipeline1);
+ Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
+ Assert.assertEquals(pipeline1.getFactor(), factor);
+ Assert.assertEquals(pipeline1.getPipelineState(),
+ Pipeline.PipelineState.OPEN);
+ Assert.assertEquals(pipeline1.getNodes().size(), factor.getNumber());
+ }
+
+ @Test
+ public void testCreatePipelineWithFactorThree() throws IOException {
+ createPipelineAndAssertions(HddsProtos.ReplicationFactor.THREE);
+ }
+
+ @Test
+ public void testCreatePipelineWithFactorOne() throws IOException {
+ createPipelineAndAssertions(HddsProtos.ReplicationFactor.ONE);
+ }
+
+ private List<DatanodeDetails> createListOfNodes(int nodeCount) {
+ List<DatanodeDetails> nodes = new ArrayList<>();
+ for (int i = 0; i < nodeCount; i++) {
+ nodes.add(TestUtils.randomDatanodeDetails());
+ }
+ return nodes;
+ }
+
+ @Test
+ public void testCreatePipelineWithNodes() {
+ HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+ Pipeline pipeline =
+ provider.create(factor, createListOfNodes(factor.getNumber()));
+ Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+ Assert.assertEquals(pipeline.getFactor(), factor);
+ Assert.assertEquals(
+ pipeline.getPipelineState(), Pipeline.PipelineState.OPEN);
+ Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+
+ factor = HddsProtos.ReplicationFactor.ONE;
+ pipeline = provider.create(factor, createListOfNodes(factor.getNumber()));
+ Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+ Assert.assertEquals(pipeline.getFactor(), factor);
+ Assert.assertEquals(pipeline.getPipelineState(),
+ Pipeline.PipelineState.OPEN);
+ Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+ }
+
+ @Test
+ public void testCreatePipelinesDnExclude() throws IOException {
+ List<DatanodeDetails> allHealthyNodes =
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
+ int totalHealthyNodesCount = allHealthyNodes.size();
+
+ HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
+
+ List<DatanodeDetails> closePipelineDns = new ArrayList<>();
+ for (int i = 0; i < totalHealthyNodesCount/3; i++) {
+ List<DatanodeDetails> pipelineDns = allHealthyNodes
+ .subList(3 * i, 3 * (i + 1));
+
+ Pipeline.PipelineState state;
+ if (i % 2 == 0) {
+ state = Pipeline.PipelineState.OPEN;
+ } else {
+ state = Pipeline.PipelineState.CLOSED;
+ closePipelineDns.addAll(pipelineDns);
+ }
+
+ Pipeline openPipeline = Pipeline.newBuilder()
+ .setType(HddsProtos.ReplicationType.RATIS)
+ .setFactor(factor)
+ .setNodes(pipelineDns)
+ .setState(state)
+ .setId(PipelineID.randomId())
+ .build();
+
+
+ stateManager.addPipeline(openPipeline);
+ nodeManager.addPipeline(openPipeline);
+ }
+
+ Pipeline pipeline = provider.create(factor);
+ Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
+ Assert.assertEquals(pipeline.getFactor(), factor);
+ Assert.assertEquals(pipeline.getPipelineState(),
+ Pipeline.PipelineState.OPEN);
+ Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
+ List<DatanodeDetails> pipelineNodes = pipeline.getNodes();
+
+ // Since we have only 10 DNs, at least 1 pipeline node should have been
+ // from the closed pipeline DN list.
+ Assert.assertTrue(pipelineNodes.parallelStream().filter(
+ closePipelineDns::contains).count() > 0);
+ }
+}
\ No newline at end of file
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 105d2e2..7aba39a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@@ -34,12 +35,13 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.PipelineReportFromDatanode;
-import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
@@ -59,6 +61,7 @@ public class TestSCMPipelineManager {
@Before
public void setUp() throws Exception {
conf = new OzoneConfiguration();
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
testDir = GenericTestUtils
.getTestDir(TestSCMPipelineManager.class.getSimpleName());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
@@ -188,10 +191,10 @@ public class TestSCMPipelineManager {
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
Assert.fail();
- } catch (InsufficientDatanodesException idEx) {
- Assert.assertEquals(
- "Cannot create pipeline of factor 3 using 1 nodes.",
- idEx.getMessage());
+ } catch (SCMException ioe) {
+ // pipeline creation failed this time.
+ Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
+ ioe.getResult());
}
metrics = getMetrics(
@@ -201,7 +204,7 @@ public class TestSCMPipelineManager {
numPipelineCreateFailed = getLongCounter(
"NumPipelineCreationFailed", metrics);
- Assert.assertTrue(numPipelineCreateFailed == 0);
+ Assert.assertTrue(numPipelineCreateFailed == 1);
// clean up
pipelineManager.close();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
index 459a67a..baeee6a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
@@ -57,8 +57,11 @@ public class TestSCMRestart {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
+ int numOfNodes = 4;
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(4)
+ .setNumDatanodes(numOfNodes)
+ // allow only one FACTOR THREE pipeline.
+ .setTotalPipelineNumLimit(numOfNodes + 1)
.setHbInterval(1000)
.setHbProcessorInterval(1000)
.build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
index 7cfd555..09c633d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
@@ -38,6 +38,7 @@ import org.junit.rules.TemporaryFolder;
import java.util.List;
import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.junit.Assert.fail;
/**
@@ -62,6 +63,8 @@ public class TestSCMSafeModeWithPipelineRules {
true);
conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "10s");
conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s");
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 50);
+
clusterBuilder = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numDatanodes)
.setHbInterval(1000)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 59cef37..5784196 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -238,6 +238,7 @@ public interface MiniOzoneCluster {
protected static final int DEFAULT_HB_INTERVAL_MS = 1000;
protected static final int DEFAULT_HB_PROCESSOR_INTERVAL_MS = 100;
protected static final int ACTIVE_OMS_NOT_SET = -1;
+ protected static final int DEFAULT_PIPELIME_LIMIT = 3;
protected final OzoneConfiguration conf;
protected String path;
@@ -265,6 +266,7 @@ public interface MiniOzoneCluster {
protected int numOfDatanodes = 1;
protected boolean startDataNodes = true;
protected CertificateClient certClient;
+ protected int pipelineNumLimit = DEFAULT_PIPELIME_LIMIT;
protected Builder(OzoneConfiguration conf) {
this.conf = conf;
@@ -352,6 +354,16 @@ public interface MiniOzoneCluster {
}
/**
+ * Sets the total number of pipelines to create.
+ * @param val number of pipelines
+ * @return MiniOzoneCluster.Builder
+ */
+ public Builder setTotalPipelineNumLimit(int val) {
+ pipelineNumLimit = val;
+ return this;
+ }
+
+ /**
* Sets the number of HeartBeat Interval of Datanodes, the value should be
* in MilliSeconds.
*
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index 2813711..f9938c5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -531,6 +531,10 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
streamBufferMaxSize.get(), streamBufferSizeUnit.get());
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(),
streamBufferSizeUnit.get());
+ // MiniOzoneCluster should have global pipeline upper limit.
+ conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT,
+ pipelineNumLimit == DEFAULT_PIPELIME_LIMIT ?
+ 2 * numOfDatanodes : pipelineNumLimit);
configureTrace();
}
@@ -542,7 +546,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
* Creates a new StorageContainerManager instance.
*
* @return {@link StorageContainerManager}
- *
+ *Wa
* @throws IOException
*/
StorageContainerManager createSCM()
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
index fd2cea3..1eef382 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
@@ -81,6 +81,7 @@ public class Test2WayCommitInRatis {
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setTotalPipelineNumLimit(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 399b977..444f362 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -84,6 +84,7 @@ public class TestBlockOutputStream {
StorageUnit.MB);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setTotalPipelineNumLimit(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 8649837..3bcd81f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
@@ -89,9 +90,11 @@ public class TestBlockOutputStreamWithFailures {
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
+
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7)
- .setBlockSize(blockSize).setChunkSize(chunkSize)
- .setStreamBufferFlushSize(flushSize)
+ .setTotalPipelineNumLimit(10).setBlockSize(blockSize)
+ .setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
.setStreamBufferMaxSize(maxFlushSize)
.setStreamBufferSizeUnit(StorageUnit.BYTES).build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
index 344c51e..d76155b 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
@@ -96,6 +96,7 @@ public class TestCommitWatcher {
StorageUnit.MB);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setTotalPipelineNumLimit(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
index 0886d26..a8f61f7 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
@@ -56,6 +56,7 @@ import java.util.function.Predicate;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
/**
* Tests delete key operation with a slow follower in the datanode
@@ -99,10 +100,12 @@ public class TestContainerReplicationEndToEnd {
1000, TimeUnit.SECONDS);
conf.setLong("hdds.scm.replication.thread.interval",
containerReportInterval);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
conf.setQuietMode(false);
cluster =
- MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).setHbInterval(200)
+ MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4)
+ .setTotalPipelineNumLimit(6).setHbInterval(200)
.build();
cluster.waitForClusterToBeReady();
cluster.getStorageContainerManager().getReplicationManager().start();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 82c4910..3b806dd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -52,8 +52,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
/**
* Tests the containerStateMachine failure handling.
@@ -82,7 +81,7 @@ public class TestContainerStateMachine {
baseDir.mkdirs();
conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
- // conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+ // conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
index 30c2624..644469e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
@@ -80,6 +80,7 @@ public class TestDeleteWithSlowFollower {
private static String bucketName;
private static String path;
private static XceiverClientManager xceiverClientManager;
+ private static final int FACTOR_THREE_PIPELINE_COUNT = 1;
/**
* Create a MiniDFSCluster for testing.
@@ -111,10 +112,13 @@ public class TestDeleteWithSlowFollower {
1000, TimeUnit.SECONDS);
conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
1, TimeUnit.SECONDS);
-
conf.setQuietMode(false);
- cluster =
- MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(100)
+ int numOfDatanodes = 3;
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(numOfDatanodes)
+ .setTotalPipelineNumLimit(
+ numOfDatanodes + FACTOR_THREE_PIPELINE_COUNT)
+ .setHbInterval(100)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
@@ -176,7 +180,7 @@ public class TestDeleteWithSlowFollower {
cluster.getStorageContainerManager().getPipelineManager()
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
- Assert.assertTrue(pipelineList.size() == 1);
+ Assert.assertTrue(pipelineList.size() >= FACTOR_THREE_PIPELINE_COUNT);
Pipeline pipeline = pipelineList.get(0);
for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
if (ContainerTestHelper.isRatisFollower(dn, pipeline)) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index edb796b..7b6d555 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -97,6 +98,7 @@ public class TestFailureHandlingByClient {
1, TimeUnit.SECONDS);
conf.setBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
conf.setQuietMode(false);
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
@@ -105,7 +107,7 @@ public class TestFailureHandlingByClient {
Collections.singleton(HddsUtils.getHostName(conf))).get(0),
"/rack1");
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(10).build();
+ .setNumDatanodes(10).setTotalPipelineNumLimit(15).build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
index 47a716e..75af061 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
@@ -67,7 +67,8 @@ public class TestHybridPipelineOnDatanode {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
- cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3)
+ .setTotalPipelineNumLimit(5).build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index 6e7e328..9e19f1c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -82,6 +82,7 @@ public class TestKeyInputStream {
StorageUnit.MB);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
+ .setTotalPipelineNumLimit(5)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
index 9666247..64047ba 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
@@ -47,8 +47,7 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
/**
* Tests MultiBlock Writes with Dn failures by Ozone Client.
@@ -87,10 +86,13 @@ public class TestMultiBlockWritesWithDnFailures {
conf.setTimeDuration(
OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
1, TimeUnit.SECONDS);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(datanodes).build();
+ .setNumDatanodes(datanodes)
+ .setTotalPipelineNumLimit(0)
+ .build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
index 5f6d494..0bc94d0 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -91,6 +91,7 @@ public class TestOzoneClientRetriesOnException {
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setTotalPipelineNumLimit(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 2163773..fd2800f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -165,6 +165,7 @@ public abstract class TestOzoneRpcClientAbstract {
static void startCluster(OzoneConfiguration conf) throws Exception {
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
+ .setTotalPipelineNumLimit(10)
.setScmId(scmId)
.build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index a5d601e..9b7ed70 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -56,6 +56,7 @@ import java.util.concurrent.TimeoutException;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
/**
* This class verifies the watchForCommit Handling by xceiverClient.
@@ -92,10 +93,12 @@ public class TestWatchForCommit {
conf.setTimeDuration(
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
1, TimeUnit.SECONDS);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setTotalPipelineNumLimit(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index b676e1c..3cafb7d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -52,6 +52,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test container closing.
*/
@@ -73,8 +75,11 @@ public class TestCloseContainerByPipeline {
public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, "1");
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
+
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(10)
+ .setTotalPipelineNumLimit(15)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
index 536d807..e677544 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
@@ -82,6 +82,7 @@ public class TestSCMContainerPlacementPolicyMetrics {
"/rack1");
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(4)
+ .setTotalPipelineNumLimit(10)
.build();
cluster.waitForClusterToBeReady();
metrics = getMetrics(SCMContainerPlacementMetrics.class.getSimpleName());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
index ecc2b3e..a882dcd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
@@ -16,6 +16,7 @@
*/
package org.apache.hadoop.ozone.scm.node;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -76,9 +77,11 @@ public class TestQueryNode {
conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numOfDatanodes)
+ .setTotalPipelineNumLimit(numOfDatanodes + numOfDatanodes/2)
.build();
cluster.waitForClusterToBeReady();
scmClient = new ContainerOperationClient(conf);
diff --git a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
index fcabc67..b3bbe3b 100644
--- a/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
+++ b/hadoop-ozone/ozonefs/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
@@ -97,6 +97,7 @@ public class TestOzoneFsHAURLs {
conf.setTimeDuration(
OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
OMStorage omStore = new OMStorage(conf);
omStore.setClusterId(clusterId);
@@ -106,6 +107,8 @@ public class TestOzoneFsHAURLs {
// Start the cluster
cluster = MiniOzoneCluster.newHABuilder(conf)
+ .setNumDatanodes(7)
+ .setTotalPipelineNumLimit(10)
.setClusterId(clusterId)
.setScmId(scmId)
.setOMServiceId(omServiceId)
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
index fdcb822..3e1c826 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
@@ -42,7 +42,7 @@ public abstract class TestDataValidate {
static void startCluster(OzoneConfiguration conf) throws Exception {
conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(5).build();
+ .setNumDatanodes(5).setTotalPipelineNumLimit(8).build();
cluster.waitForClusterToBeReady();
}
diff --git a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
index 13ecab6..bd30d4e 100644
--- a/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
+++ b/hadoop-ozone/tools/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
@@ -53,6 +53,7 @@ public class TestFreonWithPipelineDestroy {
.setHbProcessorInterval(1000)
.setHbInterval(1000)
.setNumDatanodes(3)
+ .setTotalPipelineNumLimit(8)
.build();
cluster.waitForClusterToBeReady();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org