You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by sa...@apache.org on 2020/02/10 02:23:22 UTC
[hadoop-ozone] 04/18: HDDS-1569 Support creating multiple pipelines
with same datanode. Contributed by Li Cheng.
This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch HDDS-1564
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 1ef5e6df3ed15e93296a9c0f3c2460cdec423e00
Author: Li Cheng <bl...@gmail.com>
AuthorDate: Tue Oct 29 12:46:00 2019 +0800
HDDS-1569 Support creating multiple pipelines with same datanode. Contributed by Li Cheng.
This closes #28
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 10 +-
.../common/src/main/resources/ozone-default.xml | 15 ++-
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 5 +
.../ContainerPlacementPolicyFactory.java | 8 +-
.../hdds/scm/node/states/Node2PipelineMap.java | 2 +-
.../scm/pipeline/BackgroundPipelineCreator.java | 1 +
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 89 +++++++++----
.../hadoop/hdds/scm/pipeline/PipelineStateMap.java | 5 +-
.../hdds/scm/pipeline/RatisPipelineProvider.java | 137 ++++++++++++---------
.../hdds/scm/pipeline/RatisPipelineUtils.java | 96 +++++++++++++++
.../hdds/scm/pipeline/SCMPipelineManager.java | 13 +-
.../hdds/scm/pipeline/SCMPipelineMetrics.java | 8 ++
.../scm/safemode/HealthyPipelineSafeModeRule.java | 13 +-
.../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 3 +
.../scm/pipeline/TestPipelinePlacementPolicy.java | 15 ++-
.../hdds/scm/pipeline/TestSCMPipelineManager.java | 17 +--
.../apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java | 3 +
.../hdds/scm/pipeline/TestPipelineClose.java | 1 +
.../TestRatisPipelineCreateAndDestroy.java | 24 +++-
.../hadoop/hdds/scm/pipeline/TestSCMRestart.java | 5 +-
.../safemode/TestSCMSafeModeWithPipelineRules.java | 3 +
.../org/apache/hadoop/ozone/MiniOzoneCluster.java | 12 ++
.../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 4 +
.../ozone/client/rpc/Test2WayCommitInRatis.java | 1 +
.../ozone/client/rpc/TestBlockOutputStream.java | 1 +
.../rpc/TestBlockOutputStreamWithFailures.java | 7 +-
.../hadoop/ozone/client/rpc/TestCommitWatcher.java | 1 +
.../rpc/TestContainerReplicationEndToEnd.java | 5 +-
.../client/rpc/TestContainerStateMachine.java | 5 +-
.../client/rpc/TestDeleteWithSlowFollower.java | 12 +-
.../client/rpc/TestFailureHandlingByClient.java | 4 +-
.../client/rpc/TestHybridPipelineOnDatanode.java | 3 +-
.../ozone/client/rpc/TestKeyInputStream.java | 1 +
.../rpc/TestMultiBlockWritesWithDnFailures.java | 8 +-
.../rpc/TestOzoneClientRetriesOnException.java | 1 +
.../client/rpc/TestOzoneRpcClientAbstract.java | 1 +
.../ozone/client/rpc/TestWatchForCommit.java | 3 +
.../TestCloseContainerByPipeline.java | 5 +
.../hadoop/ozone/freon/TestDataValidate.java | 2 +-
.../ozone/freon/TestFreonWithPipelineDestroy.java | 1 +
.../TestSCMContainerPlacementPolicyMetrics.java | 1 +
.../hadoop/ozone/scm/node/TestQueryNode.java | 3 +
42 files changed, 419 insertions(+), 135 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 9fa71ad..15a47f3 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -296,7 +296,15 @@ public final class ScmConfigKeys {
// the max number of pipelines can a single datanode be engaged in.
public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT =
"ozone.scm.datanode.max.pipeline.engagement";
- public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 5;
+ // Setting to zero by default means this limit doesn't take effect.
+ public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 0;
+
+ // Upper limit for how many pipelines can be created.
+ // Only for test purpose now.
+ public static final String OZONE_SCM_PIPELINE_NUMBER_LIMIT =
+ "ozone.scm.pipeline.number.limit";
+ // Setting to zero by default means this limit doesn't take effect.
+ public static final int OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT = 0;
public static final String
OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index 4144500..8bc2ea1 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -791,10 +791,19 @@
</description>
</property>
<property>
- <name>ozone.scm.datanode.max.pipeline.engagement</name>
- <value>5</value>
+ <name>ozone.scm.datanode.max.pipeline.engagement</name>
+ <value>0</value>
+ <tag>OZONE, SCM, PIPELINE</tag>
+ <description>Max number of pipelines per datanode can be engaged in.
+ </description>
+ </property>
+ <property>
+ <name>ozone.scm.pipeline.number.limit</name>
+ <value>0</value>
<tag>OZONE, SCM, PIPELINE</tag>
- <description>Max number of pipelines per datanode can be engaged in.
+ <description>Upper limit for how many pipelines can be OPEN in SCM.
+ 0 as default means there is no limit. Otherwise, the number is the limit
+ of max amount of pipelines which are OPEN.
</description>
</property>
<property>
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index b7a7525..cdc3878 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -197,8 +197,13 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// TODO: #CLUTIL Remove creation logic when all replication types and
// factors are handled by pipeline creator
pipeline = pipelineManager.createPipeline(type, factor);
+
// wait until pipeline is ready
pipelineManager.waitPipelineReady(pipeline.getId(), 0);
+ } catch (SCMException se) {
+ LOG.warn("Pipeline creation failed for type:{} factor:{}. " +
+ "Datanodes may be used up.", type, factor, se);
+ break;
} catch (IOException e) {
LOG.warn("Pipeline creation failed for type:{} factor:{}. Retrying " +
"get pipelines call once.", type, factor, e);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
index adaeb87..74431f9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
@@ -43,10 +43,10 @@ public final class ContainerPlacementPolicyFactory {
}
- public static PlacementPolicy getPolicy(Configuration conf,
- final NodeManager nodeManager, NetworkTopology clusterMap,
- final boolean fallback, SCMContainerPlacementMetrics metrics)
- throws SCMException{
+ public static PlacementPolicy getPolicy(
+ Configuration conf, final NodeManager nodeManager,
+ NetworkTopology clusterMap, final boolean fallback,
+ SCMContainerPlacementMetrics metrics) throws SCMException{
final Class<? extends PlacementPolicy> placementClass = conf
.getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
index 714188d..18809ed 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
@@ -80,7 +80,7 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
dn2ObjectMap.computeIfPresent(dnId,
(k, v) -> {
v.remove(pipeline.getId());
- return v;
+ return v.isEmpty() ? null : v;
});
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index 3006987..b663f2a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -115,6 +115,7 @@ class BackgroundPipelineCreator {
if (scheduler.isClosed()) {
break;
}
+
pipelineManager.createPipeline(type, factor);
} catch (IOException ioe) {
break;
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 1983ed6..23eb574 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -36,6 +36,7 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
import java.util.stream.Collectors;
/**
@@ -52,6 +53,7 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
static final Logger LOG =
LoggerFactory.getLogger(PipelinePlacementPolicy.class);
private final NodeManager nodeManager;
+ private final PipelineStateManager stateManager;
private final Configuration conf;
private final int heavyNodeCriteria;
@@ -59,15 +61,17 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
* Constructs a pipeline placement with considering network topology,
* load balancing and rack awareness.
*
- * @param nodeManager Node Manager
+ * @param nodeManager NodeManager
+ * @param stateManager PipelineStateManager
* @param conf Configuration
*/
- public PipelinePlacementPolicy(
- final NodeManager nodeManager, final Configuration conf) {
+ public PipelinePlacementPolicy(final NodeManager nodeManager,
+ final PipelineStateManager stateManager, final Configuration conf) {
super(nodeManager, conf);
this.nodeManager = nodeManager;
this.conf = conf;
- heavyNodeCriteria = conf.getInt(
+ this.stateManager = stateManager;
+ this.heavyNodeCriteria = conf.getInt(
ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
}
@@ -76,11 +80,46 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
* Returns true if this node meets the criteria.
*
* @param datanodeDetails DatanodeDetails
+ * @param nodesRequired nodes required count
* @return true if we have enough space.
*/
@VisibleForTesting
- boolean meetCriteria(DatanodeDetails datanodeDetails, long heavyNodeLimit) {
- return (nodeManager.getPipelinesCount(datanodeDetails) <= heavyNodeLimit);
+ boolean meetCriteria(DatanodeDetails datanodeDetails, int nodesRequired) {
+ if (heavyNodeCriteria == 0) {
+ // no limit applied.
+ return true;
+ }
+ // Datanodes from pipeline in some states can also be considered available
+ // for pipeline allocation. Thus the number of these pipeline shall be
+ // deducted from total heaviness calculation.
+ int pipelineNumDeductable = 0;
+ Set<PipelineID> pipelines = nodeManager.getPipelines(datanodeDetails);
+ for (PipelineID pid : pipelines) {
+ Pipeline pipeline;
+ try {
+ pipeline = stateManager.getPipeline(pid);
+ } catch (PipelineNotFoundException e) {
+ LOG.error("Pipeline not found in pipeline state manager during" +
+ " pipeline creation. PipelineID: " + pid +
+ " exception: " + e.getMessage());
+ continue;
+ }
+ if (pipeline != null &&
+ pipeline.getFactor().getNumber() == nodesRequired &&
+ pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
+ pipeline.getPipelineState() == Pipeline.PipelineState.CLOSED) {
+ pipelineNumDeductable++;
+ }
+ }
+ boolean meet = (nodeManager.getPipelinesCount(datanodeDetails)
+ - pipelineNumDeductable) < heavyNodeCriteria;
+ if (!meet) {
+ LOG.info("Pipeline Placement: can't place more pipeline on heavy " +
+ "datanodeļ¼ " + datanodeDetails.getUuid().toString() + " Heaviness: " +
+ nodeManager.getPipelinesCount(datanodeDetails) + " limit: " +
+ heavyNodeCriteria);
+ }
+ return meet;
}
/**
@@ -102,18 +141,19 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
if (excludedNodes != null) {
healthyNodes.removeAll(excludedNodes);
}
+ int initialHealthyNodesCount = healthyNodes.size();
String msg;
- if (healthyNodes.size() == 0) {
+ if (initialHealthyNodesCount == 0) {
msg = "No healthy node found to allocate pipeline.";
LOG.error(msg);
throw new SCMException(msg, SCMException.ResultCodes
.FAILED_TO_FIND_HEALTHY_NODES);
}
- if (healthyNodes.size() < nodesRequired) {
+ if (initialHealthyNodesCount < nodesRequired) {
msg = String.format("Not enough healthy nodes to allocate pipeline. %d "
+ " datanodes required. Found %d",
- nodesRequired, healthyNodes.size());
+ nodesRequired, initialHealthyNodesCount);
LOG.error(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -121,14 +161,17 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
// filter nodes that meet the size and pipeline engagement criteria.
// Pipeline placement doesn't take node space left into account.
- List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d ->
- meetCriteria(d, heavyNodeCriteria)).collect(Collectors.toList());
+ List<DatanodeDetails> healthyList = healthyNodes.stream()
+ .filter(d -> meetCriteria(d, nodesRequired)).limit(nodesRequired)
+ .collect(Collectors.toList());
if (healthyList.size() < nodesRequired) {
msg = String.format("Unable to find enough nodes that meet " +
"the criteria that cannot engage in more than %d pipelines." +
- " Nodes required: %d Found: %d",
- heavyNodeCriteria, nodesRequired, healthyList.size());
+ " Nodes required: %d Found: %d, healthy nodes count in " +
+ "NodeManager: %d.",
+ heavyNodeCriteria, nodesRequired, healthyList.size(),
+ initialHealthyNodesCount);
LOG.error(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -154,13 +197,11 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
// and make sure excludedNodes are excluded from list.
List<DatanodeDetails> healthyNodes =
filterViableNodes(excludedNodes, nodesRequired);
-
- // Randomly picks nodes when all nodes are equal.
+
+ // Randomly picks nodes when all nodes are equal or factor is ONE.
// This happens when network topology is absent or
// all nodes are on the same rack.
if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
- LOG.info("All nodes are considered equal. Now randomly pick nodes. " +
- "Required nodes: {}", nodesRequired);
return super.getResultSet(nodesRequired, healthyNodes);
} else {
// Since topology and rack awareness are available, picks nodes
@@ -188,8 +229,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
// First choose an anchor nodes randomly
DatanodeDetails anchor = chooseNode(healthyNodes);
if (anchor == null) {
- LOG.error("Unable to find the first healthy nodes that " +
- "meet the criteria. Required nodes: {}, Found nodes: {}",
+ LOG.error("Pipeline Placement: Unable to find the first healthy nodes " +
+ "that meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -204,8 +245,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
healthyNodes, exclude,
nodeManager.getClusterNetworkTopologyMap(), anchor);
if (nodeOnDifferentRack == null) {
- LOG.error("Unable to find nodes on different racks that " +
- "meet the criteria. Required nodes: {}, Found nodes: {}",
+ LOG.error("Pipeline Placement: Unable to find nodes on different racks " +
+ " that meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -228,9 +269,9 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
}
if (results.size() < nodesRequired) {
- LOG.error("Unable to find the required number of healthy nodes that " +
- "meet the criteria. Required nodes: {}, Found nodes: {}",
- nodesRequired, results.size());
+ LOG.error("Pipeline Placement: Unable to find the required number of " +
+ "healthy nodes that meet the criteria. Required nodes: {}, " +
+ "Found nodes: {}", nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index 443378c..8e0f32d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -52,8 +53,8 @@ class PipelineStateMap {
PipelineStateMap() {
// TODO: Use TreeMap for range operations?
- pipelineMap = new HashMap<>();
- pipeline2container = new HashMap<>();
+ pipelineMap = new ConcurrentHashMap<>();
+ pipeline2container = new ConcurrentHashMap<>();
query2OpenPipelines = new HashMap<>();
initializeQueryMap();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index dacc4ca..23b02ed 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -20,14 +20,12 @@ package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms
- .SCMContainerPlacementRandom;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -38,8 +36,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -60,6 +56,9 @@ public class RatisPipelineProvider implements PipelineProvider {
private final PipelineStateManager stateManager;
private final Configuration conf;
private final EventPublisher eventPublisher;
+ private final PipelinePlacementPolicy placementPolicy;
+ private int pipelineNumberLimit;
+ private int maxPipelinePerDatanode;
// Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
private final int parallelismForPool = 3;
@@ -82,65 +81,93 @@ public class RatisPipelineProvider implements PipelineProvider {
this.stateManager = stateManager;
this.conf = conf;
this.eventPublisher = eventPublisher;
+ this.placementPolicy =
+ new PipelinePlacementPolicy(nodeManager, stateManager, conf);
+ this.pipelineNumberLimit = conf.getInt(
+ ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT_DEFAULT);
+ this.maxPipelinePerDatanode = conf.getInt(
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT);
}
+ private List<DatanodeDetails> pickNodesNeverUsed(ReplicationFactor factor)
+ throws SCMException {
+ Set<DatanodeDetails> dnsUsed = new HashSet<>();
+ stateManager.getPipelines(ReplicationType.RATIS, factor)
+ .stream().filter(
+ p -> p.getPipelineState().equals(PipelineState.OPEN) ||
+ p.getPipelineState().equals(PipelineState.DORMANT) ||
+ p.getPipelineState().equals(PipelineState.ALLOCATED))
+ .forEach(p -> dnsUsed.addAll(p.getNodes()));
- /**
- * Create pluggable container placement policy implementation instance.
- *
- * @param nodeManager - SCM node manager.
- * @param conf - configuration.
- * @return SCM container placement policy implementation instance.
- */
- @SuppressWarnings("unchecked")
- // TODO: should we rename PlacementPolicy to PipelinePlacementPolicy?
- private static PlacementPolicy createContainerPlacementPolicy(
- final NodeManager nodeManager, final Configuration conf) {
- Class<? extends PlacementPolicy> implClass =
- (Class<? extends PlacementPolicy>) conf.getClass(
- ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
- SCMContainerPlacementRandom.class);
+ // Get list of healthy nodes
+ List<DatanodeDetails> dns = nodeManager
+ .getNodes(HddsProtos.NodeState.HEALTHY)
+ .parallelStream()
+ .filter(dn -> !dnsUsed.contains(dn))
+ .limit(factor.getNumber())
+ .collect(Collectors.toList());
+ if (dns.size() < factor.getNumber()) {
+ String e = String
+ .format("Cannot create pipeline of factor %d using %d nodes." +
+ " Used %d nodes. Healthy nodes %d", factor.getNumber(),
+ dns.size(), dnsUsed.size(),
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size());
+ throw new SCMException(e,
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+ return dns;
+ }
- try {
- Constructor<? extends PlacementPolicy> ctor =
- implClass.getDeclaredConstructor(NodeManager.class,
- Configuration.class);
- return ctor.newInstance(nodeManager, conf);
- } catch (RuntimeException e) {
- throw e;
- } catch (InvocationTargetException e) {
- throw new RuntimeException(implClass.getName()
- + " could not be constructed.", e.getCause());
- } catch (Exception e) {
-// LOG.error("Unhandled exception occurred, Placement policy will not " +
-// "be functional.");
- throw new IllegalArgumentException("Unable to load " +
- "PlacementPolicy", e);
+ private boolean exceedPipelineNumberLimit(ReplicationFactor factor) {
+ if (factor != ReplicationFactor.THREE) {
+ // Only put limits for Factor THREE pipelines.
+ return false;
+ }
+ // Per datanode limit
+ if (maxPipelinePerDatanode > 0) {
+ return (stateManager.getPipelines(ReplicationType.RATIS, factor).size() -
+ stateManager.getPipelines(ReplicationType.RATIS, factor,
+ Pipeline.PipelineState.CLOSED).size()) > maxPipelinePerDatanode *
+ nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY) /
+ factor.getNumber();
+ }
+
+ // Global limit
+ if (pipelineNumberLimit > 0) {
+ return (stateManager.getPipelines(ReplicationType.RATIS,
+ ReplicationFactor.THREE).size() - stateManager.getPipelines(
+ ReplicationType.RATIS, ReplicationFactor.THREE,
+ Pipeline.PipelineState.CLOSED).size()) >
+ (pipelineNumberLimit - stateManager.getPipelines(
+ ReplicationType.RATIS, ReplicationFactor.ONE).size());
}
+
+ return false;
}
@Override
public Pipeline create(ReplicationFactor factor) throws IOException {
- // Get set of datanodes already used for ratis pipeline
- Set<DatanodeDetails> dnsUsed = new HashSet<>();
- stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter(
- p -> p.getPipelineState().equals(PipelineState.OPEN) ||
- p.getPipelineState().equals(PipelineState.DORMANT) ||
- p.getPipelineState().equals(PipelineState.ALLOCATED))
- .forEach(p -> dnsUsed.addAll(p.getNodes()));
+ if (exceedPipelineNumberLimit(factor)) {
+ throw new SCMException("Ratis pipeline number meets the limit: " +
+ pipelineNumberLimit + " factor : " +
+ factor.getNumber(),
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
- // Get list of healthy nodes
- List<DatanodeDetails> dns =
- nodeManager.getNodes(NodeState.HEALTHY)
- .parallelStream()
- .filter(dn -> !dnsUsed.contains(dn))
- .limit(factor.getNumber())
- .collect(Collectors.toList());
- if (dns.size() < factor.getNumber()) {
- String e = String
- .format("Cannot create pipeline of factor %d using %d nodes.",
- factor.getNumber(), dns.size());
- throw new InsufficientDatanodesException(e);
+ List<DatanodeDetails> dns;
+
+ switch(factor) {
+ case ONE:
+ dns = pickNodesNeverUsed(ReplicationFactor.ONE);
+ break;
+ case THREE:
+ dns = placementPolicy.chooseDatanodes(null,
+ null, factor.getNumber(), 0);
+ break;
+ default:
+ throw new IllegalStateException("Unknown factor: " + factor.name());
}
Pipeline pipeline = Pipeline.newBuilder()
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
new file mode 100644
index 0000000..b8cdf06
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineUtils.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.grpc.GrpcTlsConfig;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Utility class for Ratis pipelines. Contains methods to create and destroy
+ * ratis pipelines.
+ */
+public final class RatisPipelineUtils {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RatisPipelineUtils.class);
+
+ private RatisPipelineUtils() {
+ }
+ /**
+ * Removes pipeline from SCM. Sends ratis command to destroy pipeline on all
+ * the datanodes.
+ *
+ * @param pipeline - Pipeline to be destroyed
+ * @param ozoneConf - Ozone configuration
+ * @param grpcTlsConfig
+ * @throws IOException
+ */
+ public static void destroyPipeline(Pipeline pipeline, Configuration ozoneConf,
+ GrpcTlsConfig grpcTlsConfig) {
+ final RaftGroup group = RatisHelper.newRaftGroup(pipeline);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("destroying pipeline:{} with {}", pipeline.getId(), group);
+ }
+ for (DatanodeDetails dn : pipeline.getNodes()) {
+ try {
+ destroyPipeline(dn, pipeline.getId(), ozoneConf, grpcTlsConfig);
+ } catch (IOException e) {
+ LOG.warn("Pipeline destroy failed for pipeline={} dn={} exception={}",
+ pipeline.getId(), dn, e.getMessage());
+ }
+ }
+ }
+
+ /**
+ * Sends ratis command to destroy pipeline on the given datanode.
+ *
+ * @param dn - Datanode on which pipeline needs to be destroyed
+ * @param pipelineID - ID of pipeline to be destroyed
+ * @param ozoneConf - Ozone configuration
+ * @param grpcTlsConfig - grpc tls configuration
+ * @throws IOException
+ */
+ static void destroyPipeline(DatanodeDetails dn, PipelineID pipelineID,
+ Configuration ozoneConf, GrpcTlsConfig grpcTlsConfig) throws IOException {
+ final String rpcType = ozoneConf
+ .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
+ ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+ final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
+ final RaftPeer p = RatisHelper.toRaftPeer(dn);
+ try(RaftClient client = RatisHelper
+ .newRaftClient(SupportedRpcType.valueOfIgnoreCase(rpcType), p,
+ retryPolicy, grpcTlsConfig, ozoneConf)) {
+ client.groupRemove(RaftGroupId.valueOf(pipelineID.getId()),
+ true, p.getId());
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 32aa7b6..035d604 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -55,10 +55,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import static org.apache.hadoop.hdds.scm
- .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm
- .ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
/**
@@ -109,8 +105,8 @@ public class SCMPipelineManager implements PipelineManager {
scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
this.backgroundPipelineCreator =
new BackgroundPipelineCreator(this, scheduler, conf);
- int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
- OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+ int cacheSize = conf.getInt(ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB,
+ ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
final File pipelineDBPath = getPipelineDBPath(conf);
this.pipelineStore =
MetadataStoreBuilder.newBuilder()
@@ -176,10 +172,9 @@ public class SCMPipelineManager implements PipelineManager {
metrics.createPerPipelineMetrics(pipeline);
}
return pipeline;
- } catch (InsufficientDatanodesException idEx) {
- throw idEx;
} catch (IOException ex) {
metrics.incNumPipelineCreationFailed();
+ LOG.error("Pipeline creation failed.", ex);
throw ex;
} finally {
lock.writeLock().unlock();
@@ -188,7 +183,7 @@ public class SCMPipelineManager implements PipelineManager {
@Override
public Pipeline createPipeline(ReplicationType type, ReplicationFactor factor,
- List<DatanodeDetails> nodes) {
+ List<DatanodeDetails> nodes) {
// This will mostly be used to create dummy pipeline for SimplePipelines.
// We don't update the metrics for SimplePipelines.
lock.writeLock().lock();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
index 40a6f29..8c348ed 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineMetrics.java
@@ -135,6 +135,14 @@ public final class SCMPipelineMetrics implements MetricsSource {
}
/**
+ * Get the number of pipeline created.
+ * @return number of pipeline
+ */
+ long getNumPipelineCreated() {
+ return numPipelineCreated.value();
+ }
+
+ /**
* Increments number of failed pipeline creation count.
*/
void incNumPipelineCreationFailed() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 33936d5..1a03c34 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -17,11 +17,14 @@
*/
package org.apache.hadoop.hdds.scm.safemode;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -46,6 +49,8 @@ public class HealthyPipelineSafeModeRule
private int healthyPipelineThresholdCount;
private int currentHealthyPipelineCount = 0;
private final double healthyPipelinesPercent;
+ private final Set<PipelineID> processedPipelineIDs =
+ new HashSet<>();
HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
PipelineManager pipelineManager,
@@ -117,8 +122,11 @@ public class HealthyPipelineSafeModeRule
Preconditions.checkNotNull(pipeline);
if (pipeline.getType() == HddsProtos.ReplicationType.RATIS &&
pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE) {
- getSafeModeMetrics().incCurrentHealthyPipelinesCount();
- currentHealthyPipelineCount++;
+ if (!processedPipelineIDs.contains(pipeline.getId())) {
+ getSafeModeMetrics().incCurrentHealthyPipelinesCount();
+ currentHealthyPipelineCount++;
+ processedPipelineIDs.add(pipeline.getId());
+ }
}
if (scmInSafeMode()) {
@@ -131,6 +139,7 @@ public class HealthyPipelineSafeModeRule
@Override
protected void cleanup() {
+ processedPipelineIDs.clear();
}
@VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 594ea5c..977038e 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -66,6 +66,8 @@ import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test DeadNodeHandler.
*/
@@ -87,6 +89,7 @@ public class TestDeadNodeHandler {
storageDir = GenericTestUtils.getTempPath(
TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0);
eventQueue = new EventQueue();
scm = HddsTestUtils.getScm(conf);
nodeManager = (SCMNodeManager) scm.getScmNodeManager();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index 2e0d0b1..1e34039 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -34,11 +34,14 @@ import org.junit.Test;
import java.util.*;
import java.util.stream.Collectors;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test for PipelinePlacementPolicy.
*/
public class TestPipelinePlacementPolicy {
private MockNodeManager nodeManager;
+ private OzoneConfiguration conf;
private PipelinePlacementPolicy placementPolicy;
private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
@@ -46,8 +49,10 @@ public class TestPipelinePlacementPolicy {
public void init() throws Exception {
nodeManager = new MockNodeManager(true,
PIPELINE_PLACEMENT_MAX_NODES_COUNT);
- placementPolicy =
- new PipelinePlacementPolicy(nodeManager, new OzoneConfiguration());
+ conf = new OzoneConfiguration();
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
+ placementPolicy = new PipelinePlacementPolicy(
+ nodeManager, new PipelineStateManager(conf), conf);
}
@Test
@@ -123,7 +128,7 @@ public class TestPipelinePlacementPolicy {
public void testHeavyNodeShouldBeExcluded() throws SCMException{
List<DatanodeDetails> healthyNodes =
nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
- int nodesRequired = healthyNodes.size()/2;
+ int nodesRequired = HddsProtos.ReplicationFactor.THREE.getNumber();
// only minority of healthy NODES are heavily engaged in pipelines.
int minorityHeavy = healthyNodes.size()/2 - 1;
List<DatanodeDetails> pickedNodes1 = placementPolicy.chooseDatanodes(
@@ -179,7 +184,9 @@ public class TestPipelinePlacementPolicy {
}
int considerHeavyCount =
- ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT + 1;
+ conf.getInt(
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT) + 1;
Node2PipelineMap mockMap = new Node2PipelineMap();
for (DatanodeDetails node : nodes) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 81723e1..08f5185 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@@ -34,12 +35,13 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.PipelineReportFromDatanode;
-import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
@@ -59,6 +61,7 @@ public class TestSCMPipelineManager {
@Before
public void setUp() throws Exception {
conf = new OzoneConfiguration();
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
testDir = GenericTestUtils
.getTestDir(TestSCMPipelineManager.class.getSimpleName());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
@@ -253,10 +256,10 @@ public class TestSCMPipelineManager {
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
Assert.fail();
- } catch (InsufficientDatanodesException idEx) {
- Assert.assertEquals(
- "Cannot create pipeline of factor 3 using 1 nodes.",
- idEx.getMessage());
+ } catch (SCMException ioe) {
+ // pipeline creation failed this time.
+ Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
+ ioe.getResult());
}
metrics = getMetrics(
@@ -266,7 +269,7 @@ public class TestSCMPipelineManager {
numPipelineCreateFailed = getLongCounter(
"NumPipelineCreationFailed", metrics);
- Assert.assertTrue(numPipelineCreateFailed == 0);
+ Assert.assertTrue(numPipelineCreateFailed == 1);
// clean up
pipelineManager.close();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
index 23d7833..acc4031 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFsHAURLs.java
@@ -97,6 +97,7 @@ public class TestOzoneFsHAURLs {
conf.setTimeDuration(
OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
OMStorage omStore = new OMStorage(conf);
omStore.setClusterId(clusterId);
@@ -106,6 +107,8 @@ public class TestOzoneFsHAURLs {
// Start the cluster
cluster = MiniOzoneCluster.newHABuilder(conf)
+ .setNumDatanodes(7)
+ .setTotalPipelineNumLimit(10)
.setClusterId(clusterId)
.setScmId(scmId)
.setOMServiceId(omServiceId)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index 21fa7bd..aba9cae 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -170,6 +170,7 @@ public class TestPipelineClose {
pipelineActionHandler
.onMessage(pipelineActionsFromDatanode, new EventQueue());
Thread.sleep(5000);
+
OzoneContainer ozoneContainer =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
index 3590e43..fc90ee9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.HddsDatanodeService;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@@ -36,6 +37,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
/**
@@ -51,9 +53,12 @@ public class TestRatisPipelineCreateAndDestroy {
public void init(int numDatanodes) throws Exception {
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS,
GenericTestUtils.getRandomizedTempPath());
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
+
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numDatanodes)
- .setHbInterval(1000)
+ .setTotalPipelineNumLimit(numDatanodes + numDatanodes/3)
+ .setHbInterval(2000)
.setHbProcessorInterval(1000)
.build();
cluster.waitForClusterToBeReady();
@@ -134,7 +139,9 @@ public class TestRatisPipelineCreateAndDestroy {
} catch (IOException ioe) {
// As now all datanodes are shutdown, they move to stale state, there
// will be no sufficient datanodes to create the pipeline.
- Assert.assertTrue(ioe instanceof InsufficientDatanodesException);
+ Assert.assertTrue(ioe instanceof SCMException);
+ Assert.assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
+ ((SCMException) ioe).getResult());
}
// make sure pipelines is destroyed
@@ -147,9 +154,14 @@ public class TestRatisPipelineCreateAndDestroy {
for (Pipeline pipeline : pipelines) {
pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
}
- // make sure pipelines is created after node start
- pipelineManager.triggerPipelineCreation();
- waitForPipelines(1);
+
+ if (cluster.getStorageContainerManager()
+ .getScmNodeManager().getNodeCount(HddsProtos.NodeState.HEALTHY) >=
+ HddsProtos.ReplicationFactor.THREE.getNumber()) {
+ // make sure pipelines is created after node start
+ pipelineManager.triggerPipelineCreation();
+ waitForPipelines(1);
+ }
}
private void waitForPipelines(int numPipelines)
@@ -157,6 +169,6 @@ public class TestRatisPipelineCreateAndDestroy {
GenericTestUtils.waitFor(() -> pipelineManager
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE, Pipeline.PipelineState.OPEN)
- .size() == numPipelines, 100, 40000);
+ .size() >= numPipelines, 100, 40000);
}
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
index 459a67a..baeee6a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMRestart.java
@@ -57,8 +57,11 @@ public class TestSCMRestart {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
+ int numOfNodes = 4;
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(4)
+ .setNumDatanodes(numOfNodes)
+ // allow only one FACTOR THREE pipeline.
+ .setTotalPipelineNumLimit(numOfNodes + 1)
.setHbInterval(1000)
.setHbProcessorInterval(1000)
.build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
index 866d0b0..4b35317 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeWithPipelineRules.java
@@ -39,6 +39,7 @@ import org.junit.rules.TemporaryFolder;
import java.util.List;
import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.junit.Assert.fail;
/**
@@ -64,6 +65,8 @@ public class TestSCMSafeModeWithPipelineRules {
true);
conf.set(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT, "10s");
conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, "10s");
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 50);
+
clusterBuilder = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numDatanodes)
.setHbInterval(1000)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index de27d5a..0042363 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -251,6 +251,7 @@ public interface MiniOzoneCluster {
protected static final int DEFAULT_HB_INTERVAL_MS = 1000;
protected static final int DEFAULT_HB_PROCESSOR_INTERVAL_MS = 100;
protected static final int ACTIVE_OMS_NOT_SET = -1;
+ protected static final int DEFAULT_PIPELIME_LIMIT = 3;
protected final OzoneConfiguration conf;
protected String path;
@@ -278,6 +279,7 @@ public interface MiniOzoneCluster {
protected int numOfDatanodes = 3;
protected boolean startDataNodes = true;
protected CertificateClient certClient;
+ protected int pipelineNumLimit = DEFAULT_PIPELIME_LIMIT;
protected Builder(OzoneConfiguration conf) {
this.conf = conf;
@@ -365,6 +367,16 @@ public interface MiniOzoneCluster {
}
/**
+ * Sets the total number of pipelines to create.
+ * @param val number of pipelines
+ * @return MiniOzoneCluster.Builder
+ */
+ public Builder setTotalPipelineNumLimit(int val) {
+ pipelineNumLimit = val;
+ return this;
+ }
+
+ /**
* Sets the number of HeartBeat Interval of Datanodes, the value should be
* in MilliSeconds.
*
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index c2e196a..9bfa8bd 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -575,6 +575,10 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
streamBufferMaxSize.get(), streamBufferSizeUnit.get());
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, blockSize.get(),
streamBufferSizeUnit.get());
+ // MiniOzoneCluster should have global pipeline upper limit.
+ conf.setInt(ScmConfigKeys.OZONE_SCM_PIPELINE_NUMBER_LIMIT,
+ pipelineNumLimit == DEFAULT_PIPELIME_LIMIT ?
+ 2 * numOfDatanodes : pipelineNumLimit);
configureTrace();
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
index c8bf36b..64ded12 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
@@ -83,6 +83,7 @@ public class Test2WayCommitInRatis {
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setTotalPipelineNumLimit(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 96226d8..fa7783c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -84,6 +84,7 @@ public class TestBlockOutputStream {
StorageUnit.MB);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setTotalPipelineNumLimit(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index e236b85..07e306e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.XceiverClientMetrics;
import org.apache.hadoop.hdds.scm.XceiverClientRatis;
@@ -91,9 +92,11 @@ public class TestBlockOutputStreamWithFailures {
conf.setQuietMode(false);
conf.setStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE, 4,
StorageUnit.MB);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
+
cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(7)
- .setBlockSize(blockSize).setChunkSize(chunkSize)
- .setStreamBufferFlushSize(flushSize)
+ .setTotalPipelineNumLimit(10).setBlockSize(blockSize)
+ .setChunkSize(chunkSize).setStreamBufferFlushSize(flushSize)
.setStreamBufferMaxSize(maxFlushSize)
.setStreamBufferSizeUnit(StorageUnit.BYTES).build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
index 4a5f528..16f50c6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
@@ -97,6 +97,7 @@ public class TestCommitWatcher {
StorageUnit.MB);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setTotalPipelineNumLimit(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
index 36f720b..439287e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerReplicationEndToEnd.java
@@ -57,6 +57,7 @@ import java.util.function.Predicate;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_DESTROY_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
/**
* Tests delete key operation with a slow follower in the datanode
@@ -107,10 +108,12 @@ public class TestContainerReplicationEndToEnd {
1000, TimeUnit.SECONDS);
conf.setLong("hdds.scm.replication.thread.interval",
containerReportInterval);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
conf.setQuietMode(false);
cluster =
- MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4).setHbInterval(200)
+ MiniOzoneCluster.newBuilder(conf).setNumDatanodes(4)
+ .setTotalPipelineNumLimit(6).setHbInterval(200)
.build();
cluster.waitForClusterToBeReady();
cluster.getStorageContainerManager().getReplicationManager().start();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
index 6bef060..ba5ed9f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestContainerStateMachine.java
@@ -53,8 +53,7 @@ import java.util.concurrent.TimeUnit;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_COMMAND_STATUS_REPORT_INTERVAL;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_CONTAINER_REPORT_INTERVAL;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
/**
* Tests the containerStateMachine failure handling.
@@ -83,7 +82,7 @@ public class TestContainerStateMachine {
baseDir.mkdirs();
conf.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
- // conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+ // conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
conf.setTimeDuration(HDDS_CONTAINER_REPORT_INTERVAL, 200,
TimeUnit.MILLISECONDS);
conf.setTimeDuration(HDDS_COMMAND_STATUS_REPORT_INTERVAL, 200,
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
index cf96a74..da2d656 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestDeleteWithSlowFollower.java
@@ -83,6 +83,7 @@ public class TestDeleteWithSlowFollower {
private static String bucketName;
private static String path;
private static XceiverClientManager xceiverClientManager;
+ private static final int FACTOR_THREE_PIPELINE_COUNT = 1;
/**
* Create a MiniDFSCluster for testing.
@@ -120,10 +121,13 @@ public class TestDeleteWithSlowFollower {
1000, TimeUnit.SECONDS);
conf.setTimeDuration(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
1, TimeUnit.SECONDS);
-
conf.setQuietMode(false);
- cluster =
- MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).setHbInterval(100)
+ int numOfDatanodes = 3;
+ cluster = MiniOzoneCluster.newBuilder(conf)
+ .setNumDatanodes(numOfDatanodes)
+ .setTotalPipelineNumLimit(
+ numOfDatanodes + FACTOR_THREE_PIPELINE_COUNT)
+ .setHbInterval(100)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
@@ -185,7 +189,7 @@ public class TestDeleteWithSlowFollower {
cluster.getStorageContainerManager().getPipelineManager()
.getPipelines(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
- Assert.assertTrue(pipelineList.size() == 1);
+ Assert.assertTrue(pipelineList.size() >= FACTOR_THREE_PIPELINE_COUNT);
Pipeline pipeline = pipelineList.get(0);
for (HddsDatanodeService dn : cluster.getHddsDatanodes()) {
if (ContainerTestHelper.isRatisFollower(dn, pipeline)) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index 21b51e7..d4e5d7d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.client.ReplicationType;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -98,6 +99,7 @@ public class TestFailureHandlingByClient {
1, TimeUnit.SECONDS);
conf.setBoolean(
OzoneConfigKeys.OZONE_NETWORK_TOPOLOGY_AWARE_READ_KEY, true);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
conf.setQuietMode(false);
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
@@ -106,7 +108,7 @@ public class TestFailureHandlingByClient {
Collections.singleton(HddsUtils.getHostName(conf))).get(0),
"/rack1");
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(10).build();
+ .setNumDatanodes(10).setTotalPipelineNumLimit(15).build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
index 47a716e..75af061 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestHybridPipelineOnDatanode.java
@@ -67,7 +67,8 @@ public class TestHybridPipelineOnDatanode {
@BeforeClass
public static void init() throws Exception {
conf = new OzoneConfiguration();
- cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3).build();
+ cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(3)
+ .setTotalPipelineNumLimit(5).build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index bb7b6f0..5892083 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -82,6 +82,7 @@ public class TestKeyInputStream {
StorageUnit.MB);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
+ .setTotalPipelineNumLimit(5)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
index 281ad4a..9b62923 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
@@ -49,8 +49,7 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.*;
/**
* Tests MultiBlock Writes with Dn failures by Ozone Client.
@@ -88,10 +87,13 @@ public class TestMultiBlockWritesWithDnFailures {
conf.setTimeDuration(
OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
1, TimeUnit.SECONDS);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(datanodes).build();
+ .setNumDatanodes(datanodes)
+ .setTotalPipelineNumLimit(0)
+ .build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
client = OzoneClientFactory.getClient(conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
index 5aefcc8..1bf2ea3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -93,6 +93,7 @@ public class TestOzoneClientRetriesOnException {
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setTotalPipelineNumLimit(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 0027dca..72dfc41 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -165,6 +165,7 @@ public abstract class TestOzoneRpcClientAbstract {
static void startCluster(OzoneConfiguration conf) throws Exception {
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(3)
+ .setTotalPipelineNumLimit(10)
.setScmId(scmId)
.build();
cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index d2007ce..b84e61c 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -58,6 +58,7 @@ import java.util.concurrent.TimeoutException;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT;
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_STALENODE_INTERVAL;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
/**
* This class verifies the watchForCommit Handling by xceiverClient.
@@ -95,10 +96,12 @@ public class TestWatchForCommit {
conf.setTimeDuration(
OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
1, TimeUnit.SECONDS);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
conf.setQuietMode(false);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(7)
+ .setTotalPipelineNumLimit(10)
.setBlockSize(blockSize)
.setChunkSize(chunkSize)
.setStreamBufferFlushSize(flushSize)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index c65ce95..8ee47a9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -53,6 +53,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test container closing.
*/
@@ -75,8 +77,11 @@ public class TestCloseContainerByPipeline {
public static void init() throws Exception {
conf = new OzoneConfiguration();
conf.set(ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT, "1");
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 2);
+
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(10)
+ .setTotalPipelineNumLimit(15)
.build();
cluster.waitForClusterToBeReady();
//the easiest way to create an open container is creating a key
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
index 7857e1f..0c875c9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
@@ -40,7 +40,7 @@ public abstract class TestDataValidate {
*/
static void startCluster(OzoneConfiguration conf) throws Exception {
cluster = MiniOzoneCluster.newBuilder(conf)
- .setNumDatanodes(5).build();
+ .setNumDatanodes(5).setTotalPipelineNumLimit(8).build();
cluster.waitForClusterToBeReady();
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
index 80ef246..5150fd4 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestFreonWithPipelineDestroy.java
@@ -55,6 +55,7 @@ public class TestFreonWithPipelineDestroy {
.setHbProcessorInterval(1000)
.setHbInterval(1000)
.setNumDatanodes(3)
+ .setTotalPipelineNumLimit(8)
.build();
cluster.waitForClusterToBeReady();
}
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
index 1f9d9fb..4025aca 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestSCMContainerPlacementPolicyMetrics.java
@@ -85,6 +85,7 @@ public class TestSCMContainerPlacementPolicyMetrics {
"/rack1");
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(4)
+ .setTotalPipelineNumLimit(10)
.build();
cluster.waitForClusterToBeReady();
metrics = getMetrics(SCMContainerPlacementMetrics.class.getSimpleName());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
index 841fd85..1ca3110 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestQueryNode.java
@@ -16,6 +16,7 @@
*/
package org.apache.hadoop.ozone.scm.node;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -78,9 +79,11 @@ public class TestQueryNode {
conf.setTimeDuration(HDDS_NODE_REPORT_INTERVAL, 1, SECONDS);
conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, SECONDS);
conf.setTimeDuration(OZONE_SCM_DEADNODE_INTERVAL, 6, SECONDS);
+ conf.setInt(ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 3);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(numOfDatanodes)
+ .setTotalPipelineNumLimit(numOfDatanodes + numOfDatanodes/2)
.build();
cluster.waitForClusterToBeReady();
scmClient = new ContainerOperationClient(conf);
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org