You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by el...@apache.org on 2019/10/13 06:52:06 UTC
[hadoop-ozone] 01/02: HDDS-1569 Support creating multiple pipelines
with same datanode.
This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch HDDS-1569
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit fbef6c3d732e20ae45f9c5d43cd3d57670b90dd3
Author: timmycheng <ti...@tencent.com>
AuthorDate: Wed Sep 25 11:40:12 2019 +0800
HDDS-1569 Support creating multiple pipelines with same datanode.
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 3 +-
.../ContainerPlacementPolicyFactory.java | 8 +--
.../hdds/scm/node/states/Node2PipelineMap.java | 4 ++
.../hdds/scm/pipeline/PipelineActionHandler.java | 2 +-
.../hdds/scm/pipeline/PipelinePlacementPolicy.java | 52 +++++++++++------
.../hadoop/hdds/scm/pipeline/PipelineStateMap.java | 5 +-
.../hdds/scm/pipeline/RatisPipelineProvider.java | 68 ++--------------------
.../hdds/scm/pipeline/SCMPipelineManager.java | 1 +
.../scm/safemode/HealthyPipelineSafeModeRule.java | 65 ++++++++++-----------
.../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 3 +
.../scm/pipeline/TestPipelinePlacementPolicy.java | 11 +++-
.../hdds/scm/pipeline/TestPipelineClose.java | 2 +-
.../scm/pipeline/TestRatisPipelineProvider.java | 32 +++++-----
.../hdds/scm/pipeline/TestSCMPipelineManager.java | 11 ++--
.../org/apache/hadoop/ozone/MiniOzoneCluster.java | 4 ++
15 files changed, 122 insertions(+), 149 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index ad7073e..fe51f51 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -317,7 +317,8 @@ public final class ScmConfigKeys {
// the max number of pipelines can a single datanode be engaged in.
public static final String OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT =
"ozone.scm.datanode.max.pipeline.engagement";
- public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 5;
+ // Setting to zero by default means this limit doesn't take effect.
+ public static final int OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT = 0;
public static final String
OZONE_SCM_KEY_VALUE_CONTAINER_DELETION_CHOOSING_POLICY =
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
index adaeb87..74431f9 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/ContainerPlacementPolicyFactory.java
@@ -43,10 +43,10 @@ public final class ContainerPlacementPolicyFactory {
}
- public static PlacementPolicy getPolicy(Configuration conf,
- final NodeManager nodeManager, NetworkTopology clusterMap,
- final boolean fallback, SCMContainerPlacementMetrics metrics)
- throws SCMException{
+ public static PlacementPolicy getPolicy(
+ Configuration conf, final NodeManager nodeManager,
+ NetworkTopology clusterMap, final boolean fallback,
+ SCMContainerPlacementMetrics metrics) throws SCMException{
final Class<? extends PlacementPolicy> placementClass = conf
.getClass(ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
OZONE_SCM_CONTAINER_PLACEMENT_IMPL_DEFAULT,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
index 714188d..496b9e7 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/states/Node2PipelineMap.java
@@ -71,6 +71,10 @@ public class Node2PipelineMap extends Node2ObjectsMap<PipelineID> {
UUID dnId = details.getUuid();
dn2ObjectMap.computeIfAbsent(dnId, k -> ConcurrentHashMap.newKeySet())
.add(pipeline.getId());
+ dn2ObjectMap.computeIfPresent(dnId, (k, v) -> {
+ v.add(pipeline.getId());
+ return v;
+ });
}
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
index 8d497fa..8d040f1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
@@ -57,7 +57,7 @@ public class PipelineActionHandler
pipelineID = PipelineID.
getFromProtobuf(action.getClosePipeline().getPipelineID());
Pipeline pipeline = pipelineManager.getPipeline(pipelineID);
- LOG.error("Received pipeline action {} for {} from datanode {}. " +
+ LOG.info("Received pipeline action {} for {} from datanode {}. " +
"Reason : {}", action.getAction(), pipeline,
report.getDatanodeDetails(),
action.getClosePipeline().getDetailedReason());
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
index 1983ed6..df46fad 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelinePlacementPolicy.java
@@ -79,8 +79,20 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
* @return true if we have enough space.
*/
@VisibleForTesting
- boolean meetCriteria(DatanodeDetails datanodeDetails, long heavyNodeLimit) {
- return (nodeManager.getPipelinesCount(datanodeDetails) <= heavyNodeLimit);
+ boolean meetCriteria(DatanodeDetails datanodeDetails) {
+ if (heavyNodeCriteria == 0) {
+ // no limit applied.
+ return true;
+ }
+ boolean meet = (nodeManager.getPipelinesCount(datanodeDetails)
+ < heavyNodeCriteria);
+ if (!meet) {
+ LOG.info("Pipeline Placement: can't place more pipeline on heavy " +
+ "datanodeļ¼ " + datanodeDetails.getUuid().toString() + " Heaviness: " +
+ nodeManager.getPipelinesCount(datanodeDetails) + " limit: " +
+ heavyNodeCriteria);
+ }
+ return meet;
}
/**
@@ -102,18 +114,19 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
if (excludedNodes != null) {
healthyNodes.removeAll(excludedNodes);
}
+ int initialHealthyNodesCount = healthyNodes.size();
String msg;
- if (healthyNodes.size() == 0) {
+ if (initialHealthyNodesCount == 0) {
msg = "No healthy node found to allocate pipeline.";
LOG.error(msg);
throw new SCMException(msg, SCMException.ResultCodes
.FAILED_TO_FIND_HEALTHY_NODES);
}
- if (healthyNodes.size() < nodesRequired) {
+ if (initialHealthyNodesCount < nodesRequired) {
msg = String.format("Not enough healthy nodes to allocate pipeline. %d "
+ " datanodes required. Found %d",
- nodesRequired, healthyNodes.size());
+ nodesRequired, initialHealthyNodesCount);
LOG.error(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -122,13 +135,15 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
// filter nodes that meet the size and pipeline engagement criteria.
// Pipeline placement doesn't take node space left into account.
List<DatanodeDetails> healthyList = healthyNodes.stream().filter(d ->
- meetCriteria(d, heavyNodeCriteria)).collect(Collectors.toList());
+ meetCriteria(d)).collect(Collectors.toList());
if (healthyList.size() < nodesRequired) {
msg = String.format("Unable to find enough nodes that meet " +
"the criteria that cannot engage in more than %d pipelines." +
- " Nodes required: %d Found: %d",
- heavyNodeCriteria, nodesRequired, healthyList.size());
+ " Nodes required: %d Found: %d, healthy nodes count in" +
+ "NodeManager: %d.",
+ heavyNodeCriteria, nodesRequired, healthyList.size(),
+ initialHealthyNodesCount);
LOG.error(msg);
throw new SCMException(msg,
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -155,12 +170,11 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
List<DatanodeDetails> healthyNodes =
filterViableNodes(excludedNodes, nodesRequired);
- // Randomly picks nodes when all nodes are equal.
+ // Randomly picks nodes when all nodes are equal or factor is ONE.
// This happens when network topology is absent or
// all nodes are on the same rack.
- if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())) {
- LOG.info("All nodes are considered equal. Now randomly pick nodes. " +
- "Required nodes: {}", nodesRequired);
+ if (checkAllNodesAreEqual(nodeManager.getClusterNetworkTopologyMap())
+ || nodesRequired == HddsProtos.ReplicationFactor.ONE.getNumber()) {
return super.getResultSet(nodesRequired, healthyNodes);
} else {
// Since topology and rack awareness are available, picks nodes
@@ -188,8 +202,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
// First choose an anchor nodes randomly
DatanodeDetails anchor = chooseNode(healthyNodes);
if (anchor == null) {
- LOG.error("Unable to find the first healthy nodes that " +
- "meet the criteria. Required nodes: {}, Found nodes: {}",
+ LOG.error("Pipeline Placement: Unable to find the first healthy nodes " +
+ "that meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -204,8 +218,8 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
healthyNodes, exclude,
nodeManager.getClusterNetworkTopologyMap(), anchor);
if (nodeOnDifferentRack == null) {
- LOG.error("Unable to find nodes on different racks that " +
- "meet the criteria. Required nodes: {}, Found nodes: {}",
+ LOG.error("Pipeline Placement: Unable to find nodes on different racks " +
+ " that meet the criteria. Required nodes: {}, Found nodes: {}",
nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
@@ -228,9 +242,9 @@ public final class PipelinePlacementPolicy extends SCMCommonPlacementPolicy {
}
if (results.size() < nodesRequired) {
- LOG.error("Unable to find the required number of healthy nodes that " +
- "meet the criteria. Required nodes: {}, Found nodes: {}",
- nodesRequired, results.size());
+ LOG.error("Pipeline Placement: Unable to find the required number of " +
+ "healthy nodes that meet the criteria. Required nodes: {}, " +
+ "Found nodes: {}", nodesRequired, results.size());
throw new SCMException("Unable to find required number of nodes.",
SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
index 443378c..8e0f32d 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineStateMap.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Predicate;
import java.util.stream.Collectors;
@@ -52,8 +53,8 @@ class PipelineStateMap {
PipelineStateMap() {
// TODO: Use TreeMap for range operations?
- pipelineMap = new HashMap<>();
- pipeline2container = new HashMap<>();
+ pipelineMap = new ConcurrentHashMap<>();
+ pipeline2container = new ConcurrentHashMap<>();
query2OpenPipelines = new HashMap<>();
initializeQueryMap();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index 9409728..6a51957 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -22,11 +22,8 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
-import org.apache.hadoop.hdds.scm.PlacementPolicy;
-import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
import org.apache.hadoop.io.MultipleIOException;
@@ -44,19 +41,14 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
/**
* Implements Api for creating ratis pipelines.
@@ -69,6 +61,7 @@ public class RatisPipelineProvider implements PipelineProvider {
private final NodeManager nodeManager;
private final PipelineStateManager stateManager;
private final Configuration conf;
+ private final PipelinePlacementPolicy placementPolicy;
// Set parallelism at 3, as now in Ratis we create 1 and 3 node pipelines.
private final int parallelismForPool = 3;
@@ -92,66 +85,13 @@ public class RatisPipelineProvider implements PipelineProvider {
this.stateManager = stateManager;
this.conf = conf;
this.tlsConfig = tlsConfig;
- }
-
-
- /**
- * Create pluggable container placement policy implementation instance.
- *
- * @param nodeManager - SCM node manager.
- * @param conf - configuration.
- * @return SCM container placement policy implementation instance.
- */
- @SuppressWarnings("unchecked")
- // TODO: should we rename PlacementPolicy to PipelinePlacementPolicy?
- private static PlacementPolicy createContainerPlacementPolicy(
- final NodeManager nodeManager, final Configuration conf) {
- Class<? extends PlacementPolicy> implClass =
- (Class<? extends PlacementPolicy>) conf.getClass(
- ScmConfigKeys.OZONE_SCM_CONTAINER_PLACEMENT_IMPL_KEY,
- SCMContainerPlacementRandom.class);
-
- try {
- Constructor<? extends PlacementPolicy> ctor =
- implClass.getDeclaredConstructor(NodeManager.class,
- Configuration.class);
- return ctor.newInstance(nodeManager, conf);
- } catch (RuntimeException e) {
- throw e;
- } catch (InvocationTargetException e) {
- throw new RuntimeException(implClass.getName()
- + " could not be constructed.", e.getCause());
- } catch (Exception e) {
-// LOG.error("Unhandled exception occurred, Placement policy will not " +
-// "be functional.");
- throw new IllegalArgumentException("Unable to load " +
- "PlacementPolicy", e);
- }
+ this.placementPolicy = new PipelinePlacementPolicy(nodeManager, conf);
}
@Override
public Pipeline create(ReplicationFactor factor) throws IOException {
- // Get set of datanodes already used for ratis pipeline
- Set<DatanodeDetails> dnsUsed = new HashSet<>();
- stateManager.getPipelines(ReplicationType.RATIS, factor).stream().filter(
- p -> p.getPipelineState().equals(PipelineState.OPEN) ||
- p.getPipelineState().equals(PipelineState.DORMANT) ||
- p.getPipelineState().equals(PipelineState.ALLOCATED))
- .forEach(p -> dnsUsed.addAll(p.getNodes()));
-
- // Get list of healthy nodes
- List<DatanodeDetails> dns =
- nodeManager.getNodes(NodeState.HEALTHY)
- .parallelStream()
- .filter(dn -> !dnsUsed.contains(dn))
- .limit(factor.getNumber())
- .collect(Collectors.toList());
- if (dns.size() < factor.getNumber()) {
- String e = String
- .format("Cannot create pipeline of factor %d using %d nodes.",
- factor.getNumber(), dns.size());
- throw new InsufficientDatanodesException(e);
- }
+ List<DatanodeDetails> dns = placementPolicy.chooseDatanodes(null,
+ null, factor.getNumber(), 0);
Pipeline pipeline = Pipeline.newBuilder()
.setId(PipelineID.randomId())
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 0964f6d..a927d56 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -164,6 +164,7 @@ public class SCMPipelineManager implements PipelineManager {
throw idEx;
} catch (IOException ex) {
metrics.incNumPipelineCreationFailed();
+ LOG.error("Pipeline creation failed.", ex);
throw ex;
} finally {
lock.writeLock().unlock();
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
index 7a00d76..b3aac5e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/HealthyPipelineSafeModeRule.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hdds.scm.safemode;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
@@ -56,7 +55,7 @@ public class HealthyPipelineSafeModeRule
private final PipelineManager pipelineManager;
private final int healthyPipelineThresholdCount;
private int currentHealthyPipelineCount = 0;
- private final Set<DatanodeDetails> processedDatanodeDetails =
+ private final Set<PipelineID> processedPipelineIDs =
new HashSet<>();
HealthyPipelineSafeModeRule(String ruleName, EventQueue eventQueue,
@@ -116,46 +115,46 @@ public class HealthyPipelineSafeModeRule
// processed report event, we should not consider this pipeline report
// from datanode again during threshold calculation.
Preconditions.checkNotNull(pipelineReportFromDatanode);
- DatanodeDetails dnDetails = pipelineReportFromDatanode.getDatanodeDetails();
- if (!processedDatanodeDetails.contains(
- pipelineReportFromDatanode.getDatanodeDetails())) {
-
- Pipeline pipeline;
- PipelineReportsProto pipelineReport =
- pipelineReportFromDatanode.getReport();
-
- for (PipelineReport report : pipelineReport.getPipelineReportList()) {
- PipelineID pipelineID = PipelineID
- .getFromProtobuf(report.getPipelineID());
- try {
- pipeline = pipelineManager.getPipeline(pipelineID);
- } catch (PipelineNotFoundException e) {
- continue;
- }
-
- if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
- pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
- // If the pipeline is open state mean, all 3 datanodes are reported
- // for this pipeline.
- currentHealthyPipelineCount++;
- getSafeModeMetrics().incCurrentHealthyPipelinesCount();
- }
+
+ Pipeline pipeline;
+ PipelineReportsProto pipelineReport =
+ pipelineReportFromDatanode.getReport();
+
+ for (PipelineReport report : pipelineReport.getPipelineReportList()) {
+ PipelineID pipelineID = PipelineID
+ .getFromProtobuf(report.getPipelineID());
+ if (processedPipelineIDs.contains(pipelineID)) {
+ continue;
+ }
+
+ try {
+ pipeline = pipelineManager.getPipeline(pipelineID);
+ } catch (PipelineNotFoundException e) {
+ continue;
}
- if (scmInSafeMode()) {
- SCMSafeModeManager.getLogger().info(
- "SCM in safe mode. Healthy pipelines reported count is {}, " +
- "required healthy pipeline reported count is {}",
- currentHealthyPipelineCount, healthyPipelineThresholdCount);
+
+ if (pipeline.getFactor() == HddsProtos.ReplicationFactor.THREE &&
+ pipeline.getPipelineState() == Pipeline.PipelineState.OPEN) {
+ // If the pipeline is open state mean, all 3 datanodes are reported
+ // for this pipeline.
+ currentHealthyPipelineCount++;
+ getSafeModeMetrics().incCurrentHealthyPipelinesCount();
}
- processedDatanodeDetails.add(dnDetails);
+ processedPipelineIDs.add(pipelineID);
}
+ if (scmInSafeMode()) {
+ SCMSafeModeManager.getLogger().info(
+ "SCM in safe mode. Healthy pipelines reported count is {}, " +
+ "required healthy pipeline reported count is {}",
+ currentHealthyPipelineCount, healthyPipelineThresholdCount);
+ }
}
@Override
protected void cleanup() {
- processedDatanodeDetails.clear();
+ processedPipelineIDs.clear();
}
@VisibleForTesting
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 7657b54..20cc3cf 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -64,6 +64,8 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test DeadNodeHandler.
*/
@@ -84,6 +86,7 @@ public class TestDeadNodeHandler {
storageDir = GenericTestUtils.getTempPath(
TestDeadNodeHandler.class.getSimpleName() + UUID.randomUUID());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, storageDir);
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0);
eventQueue = new EventQueue();
scm = HddsTestUtils.getScm(conf);
nodeManager = (SCMNodeManager) scm.getScmNodeManager();
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
index 2e0d0b1..e200d6f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelinePlacementPolicy.java
@@ -34,11 +34,14 @@ import org.junit.Test;
import java.util.*;
import java.util.stream.Collectors;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test for PipelinePlacementPolicy.
*/
public class TestPipelinePlacementPolicy {
private MockNodeManager nodeManager;
+ private OzoneConfiguration conf;
private PipelinePlacementPolicy placementPolicy;
private static final int PIPELINE_PLACEMENT_MAX_NODES_COUNT = 10;
@@ -46,8 +49,10 @@ public class TestPipelinePlacementPolicy {
public void init() throws Exception {
nodeManager = new MockNodeManager(true,
PIPELINE_PLACEMENT_MAX_NODES_COUNT);
+ conf = new OzoneConfiguration();
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 5);
placementPolicy =
- new PipelinePlacementPolicy(nodeManager, new OzoneConfiguration());
+ new PipelinePlacementPolicy(nodeManager, conf);
}
@Test
@@ -179,7 +184,9 @@ public class TestPipelinePlacementPolicy {
}
int considerHeavyCount =
- ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT + 1;
+ conf.getInt(
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT,
+ ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT_DEFAULT) + 1;
Node2PipelineMap mockMap = new Node2PipelineMap();
for (DatanodeDetails node : nodes) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
index c583559..9bccb1a 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineClose.java
@@ -169,7 +169,7 @@ public class TestPipelineClose {
new PipelineActionHandler(pipelineManager, conf);
pipelineActionHandler
.onMessage(pipelineActionsFromDatanode, new EventQueue());
- Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 1.2));
+ Thread.sleep((int) (pipelineDestroyTimeoutInMillis * 10));
OzoneContainer ozoneContainer =
cluster.getHddsDatanodes().get(0).getDatanodeStateMachine()
.getContainer();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
index 00144e4..0a8c5ad 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineProvider.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdds.scm.pipeline;
-import org.apache.commons.collections.CollectionUtils;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
@@ -33,6 +32,8 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Test for RatisPipelineProvider.
*/
@@ -46,14 +47,17 @@ public class TestRatisPipelineProvider {
public void init() throws Exception {
nodeManager = new MockNodeManager(true, 10);
stateManager = new PipelineStateManager(new OzoneConfiguration());
+ OzoneConfiguration conf = new OzoneConfiguration();
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
provider = new MockRatisPipelineProvider(nodeManager,
- stateManager, new OzoneConfiguration());
+ stateManager, conf);
}
private void createPipelineAndAssertions(
HddsProtos.ReplicationFactor factor) throws IOException {
Pipeline pipeline = provider.create(factor);
stateManager.addPipeline(pipeline);
+ nodeManager.addPipeline(pipeline);
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getPipelineState(),
@@ -61,10 +65,7 @@ public class TestRatisPipelineProvider {
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
Pipeline pipeline1 = provider.create(factor);
stateManager.addPipeline(pipeline1);
- // New pipeline should not overlap with the previous created pipeline
- Assert.assertTrue(
- CollectionUtils.intersection(pipeline.getNodes(), pipeline1.getNodes())
- .isEmpty());
+ nodeManager.addPipeline(pipeline1);
Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline1.getFactor(), factor);
Assert.assertEquals(pipeline1.getPipelineState(),
@@ -77,6 +78,7 @@ public class TestRatisPipelineProvider {
HddsProtos.ReplicationFactor factor = HddsProtos.ReplicationFactor.THREE;
Pipeline pipeline = provider.create(factor);
stateManager.addPipeline(pipeline);
+ nodeManager.addPipeline(pipeline);
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline.getFactor(), factor);
Assert.assertEquals(pipeline.getPipelineState(),
@@ -86,11 +88,7 @@ public class TestRatisPipelineProvider {
factor = HddsProtos.ReplicationFactor.ONE;
Pipeline pipeline1 = provider.create(factor);
stateManager.addPipeline(pipeline1);
- // New pipeline should overlap with the previous created pipeline,
- // and one datanode should overlap between the two types.
- Assert.assertEquals(
- CollectionUtils.intersection(pipeline.getNodes(),
- pipeline1.getNodes()).size(), 1);
+ nodeManager.addPipeline(pipeline1);
Assert.assertEquals(pipeline1.getType(), HddsProtos.ReplicationType.RATIS);
Assert.assertEquals(pipeline1.getFactor(), factor);
Assert.assertEquals(pipeline1.getPipelineState(),
@@ -154,6 +152,10 @@ public class TestRatisPipelineProvider {
.build();
stateManager.addPipeline(openPipeline);
+ nodeManager.addPipeline(openPipeline);
+ for (DatanodeDetails node : openPipeline.getNodes()) {
+ System.out.println("open pipeline contains " + node.getUuid());
+ }
// Use up next 3 DNs also for an open pipeline.
List<DatanodeDetails> moreOpenPiplineDns = nodeManager.getAllNodes()
@@ -166,6 +168,7 @@ public class TestRatisPipelineProvider {
.setId(PipelineID.randomId())
.build();
stateManager.addPipeline(anotherOpenPipeline);
+ nodeManager.addPipeline(anotherOpenPipeline);
// Use up next 3 DNs also for a closed pipeline.
List<DatanodeDetails> closedPiplineDns = nodeManager.getAllNodes()
@@ -178,6 +181,7 @@ public class TestRatisPipelineProvider {
.setId(PipelineID.randomId())
.build();
stateManager.addPipeline(anotherClosedPipeline);
+ nodeManager.addPipeline(anotherClosedPipeline);
Pipeline pipeline = provider.create(factor);
Assert.assertEquals(pipeline.getType(), HddsProtos.ReplicationType.RATIS);
@@ -187,12 +191,6 @@ public class TestRatisPipelineProvider {
Assert.assertEquals(pipeline.getNodes().size(), factor.getNumber());
List<DatanodeDetails> pipelineNodes = pipeline.getNodes();
- // Pipline nodes cannot be from open pipelines.
- Assert.assertTrue(
- pipelineNodes.parallelStream().filter(dn ->
- (openPiplineDns.contains(dn) || moreOpenPiplineDns.contains(dn)))
- .count() == 0);
-
// Since we have only 10 DNs, at least 1 pipeline node should have been
// from the closed pipeline DN list.
Assert.assertTrue(pipelineNodes.parallelStream().filter(
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 2a486b1..9d59960 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.pipeline;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
@@ -58,6 +60,7 @@ public class TestSCMPipelineManager {
@Before
public void setUp() throws Exception {
conf = new OzoneConfiguration();
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 1);
testDir = GenericTestUtils
.getTestDir(TestSCMPipelineManager.class.getSimpleName());
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
@@ -253,10 +256,8 @@ public class TestSCMPipelineManager {
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
Assert.fail();
- } catch (InsufficientDatanodesException idEx) {
- Assert.assertEquals(
- "Cannot create pipeline of factor 3 using 1 nodes.",
- idEx.getMessage());
+ } catch (SCMException idEx) {
+ // pipeline creation failed this time.
}
metrics = getMetrics(
@@ -266,7 +267,7 @@ public class TestSCMPipelineManager {
numPipelineCreateFailed = getLongCounter(
"NumPipelineCreationFailed", metrics);
- Assert.assertTrue(numPipelineCreateFailed == 0);
+ Assert.assertTrue(numPipelineCreateFailed == 1);
}
@Test
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
index 0aba968..19c1406 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneCluster.java
@@ -36,6 +36,8 @@ import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeoutException;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT;
+
/**
* Interface used for MiniOzoneClusters.
*/
@@ -269,6 +271,8 @@ public interface MiniOzoneCluster {
protected Builder(OzoneConfiguration conf) {
this.conf = conf;
+ // MiniOzoneCluster doesn't have pipeline engagement limit.
+ conf.setInt(OZONE_DATANODE_MAX_PIPELINE_ENGAGEMENT, 0);
setClusterId(UUID.randomUUID().toString());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: hdfs-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: hdfs-commits-help@hadoop.apache.org