You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2020/04/17 04:18:40 UTC
[hadoop-ozone] branch master updated: HDDS-3322. StandAlone
Pipelines are created in an infinite loop (#749)
This is an automated email from the ASF dual-hosted git repository.
bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 2f37a25 HDDS-3322. StandAlone Pipelines are created in an infinite loop (#749)
2f37a25 is described below
commit 2f37a25bf02518bb4484b608f803c6ce4a4656d4
Author: Hanisha Koneru <ha...@apache.org>
AuthorDate: Thu Apr 16 21:18:32 2020 -0700
HDDS-3322. StandAlone Pipelines are created in an infinite loop (#749)
---
.../scm/pipeline/BackgroundPipelineCreator.java | 9 ++-
.../hadoop/hdds/scm/pipeline/PipelineFactory.java | 2 +-
.../hadoop/hdds/scm/pipeline/PipelineProvider.java | 75 +++++++++++++++++++---
.../hdds/scm/pipeline/RatisPipelineProvider.java | 61 ++++--------------
.../hdds/scm/pipeline/SCMPipelineManager.java | 2 +
.../hdds/scm/pipeline/SimplePipelineProvider.java | 15 ++---
.../scm/pipeline/TestSimplePipelineProvider.java | 2 +-
.../ozone/recon/scm/ReconPipelineFactory.java | 4 +-
8 files changed, 101 insertions(+), 69 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index b8f0fb6..001d185 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -91,8 +91,13 @@ class BackgroundPipelineCreator {
private boolean skipCreation(HddsProtos.ReplicationFactor factor,
HddsProtos.ReplicationType type,
boolean autoCreate) {
- return factor == HddsProtos.ReplicationFactor.ONE &&
- type == HddsProtos.ReplicationType.RATIS && (!autoCreate);
+ if (type == HddsProtos.ReplicationType.RATIS) {
+ return factor == HddsProtos.ReplicationFactor.ONE && (!autoCreate);
+ } else {
+ // For STAND_ALONE Replication Type, Replication Factor 3 should not be
+ // used.
+ return factor == HddsProtos.ReplicationFactor.THREE;
+ }
}
private void createPipelines() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 58a8fd7..9e5353a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -43,7 +43,7 @@ public class PipelineFactory {
Configuration conf, EventPublisher eventPublisher) {
providers = new HashMap<>();
providers.put(ReplicationType.STAND_ALONE,
- new SimplePipelineProvider(nodeManager));
+ new SimplePipelineProvider(nodeManager, stateManager));
providers.put(ReplicationType.RATIS,
new RatisPipelineProvider(nodeManager, stateManager, conf,
eventPublisher));
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index c00ff78..533f77e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -18,22 +18,81 @@
package org.apache.hadoop.hdds.scm.pipeline;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-
import java.io.IOException;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
/**
* Interface for creating pipelines.
*/
-public interface PipelineProvider {
+public abstract class PipelineProvider {
+
+ private final NodeManager nodeManager;
+ private final PipelineStateManager stateManager;
+
+ public PipelineProvider(NodeManager nodeManager,
+ PipelineStateManager stateManager) {
+ this.nodeManager = nodeManager;
+ this.stateManager = stateManager;
+ }
+
+ public PipelineProvider() {
+ this.nodeManager = null;
+ this.stateManager = null;
+ }
+
+ public NodeManager getNodeManager() {
+ return nodeManager;
+ }
+
+ public PipelineStateManager getPipelineStateManager() {
+ return stateManager;
+ }
+
+ protected abstract Pipeline create(ReplicationFactor factor)
+ throws IOException;
+
+ protected abstract Pipeline create(ReplicationFactor factor,
+ List<DatanodeDetails> nodes);
- Pipeline create(ReplicationFactor factor) throws IOException;
+ protected abstract void close(Pipeline pipeline) throws IOException;
- Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes);
+ protected abstract void shutdown();
- void close(Pipeline pipeline) throws IOException;
+ List<DatanodeDetails> pickNodesNeverUsed(ReplicationType type,
+ ReplicationFactor factor) throws SCMException {
+ Set<DatanodeDetails> dnsUsed = new HashSet<>();
+ stateManager.getPipelines(type, factor).stream().filter(
+ p -> p.getPipelineState().equals(Pipeline.PipelineState.OPEN) ||
+ p.getPipelineState().equals(Pipeline.PipelineState.DORMANT) ||
+ p.getPipelineState().equals(Pipeline.PipelineState.ALLOCATED))
+ .forEach(p -> dnsUsed.addAll(p.getNodes()));
- void shutdown();
+ // Get list of healthy nodes
+ List<DatanodeDetails> dns = nodeManager
+ .getNodes(HddsProtos.NodeState.HEALTHY)
+ .parallelStream()
+ .filter(dn -> !dnsUsed.contains(dn))
+ .limit(factor.getNumber())
+ .collect(Collectors.toList());
+ if (dns.size() < factor.getNumber()) {
+ String e = String
+ .format("Cannot create pipeline of factor %d using %d nodes." +
+ " Used %d nodes. Healthy nodes %d", factor.getNumber(),
+ dns.size(), dnsUsed.size(),
+ nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size());
+ throw new SCMException(e,
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+ return dns;
+ }
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index ad100ab..9d7c996 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -36,21 +36,16 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
/**
* Implements Api for creating ratis pipelines.
*/
-public class RatisPipelineProvider implements PipelineProvider {
+public class RatisPipelineProvider extends PipelineProvider {
private static final Logger LOG =
LoggerFactory.getLogger(RatisPipelineProvider.class);
- private final NodeManager nodeManager;
- private final PipelineStateManager stateManager;
private final Configuration conf;
private final EventPublisher eventPublisher;
private final PipelinePlacementPolicy placementPolicy;
@@ -60,8 +55,7 @@ public class RatisPipelineProvider implements PipelineProvider {
RatisPipelineProvider(NodeManager nodeManager,
PipelineStateManager stateManager, Configuration conf,
EventPublisher eventPublisher) {
- this.nodeManager = nodeManager;
- this.stateManager = stateManager;
+ super(nodeManager, stateManager);
this.conf = conf;
this.eventPublisher = eventPublisher;
this.placementPolicy =
@@ -74,35 +68,6 @@ public class RatisPipelineProvider implements PipelineProvider {
ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
}
- private List<DatanodeDetails> pickNodesNeverUsed(ReplicationFactor factor)
- throws SCMException {
- Set<DatanodeDetails> dnsUsed = new HashSet<>();
- stateManager.getPipelines(ReplicationType.RATIS, factor)
- .stream().filter(
- p -> p.getPipelineState().equals(PipelineState.OPEN) ||
- p.getPipelineState().equals(PipelineState.DORMANT) ||
- p.getPipelineState().equals(PipelineState.ALLOCATED))
- .forEach(p -> dnsUsed.addAll(p.getNodes()));
-
- // Get list of healthy nodes
- List<DatanodeDetails> dns = nodeManager
- .getNodes(HddsProtos.NodeState.HEALTHY)
- .parallelStream()
- .filter(dn -> !dnsUsed.contains(dn))
- .limit(factor.getNumber())
- .collect(Collectors.toList());
- if (dns.size() < factor.getNumber()) {
- String e = String
- .format("Cannot create pipeline of factor %d using %d nodes." +
- " Used %d nodes. Healthy nodes %d", factor.getNumber(),
- dns.size(), dnsUsed.size(),
- nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size());
- throw new SCMException(e,
- SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
- }
- return dns;
- }
-
private boolean exceedPipelineNumberLimit(ReplicationFactor factor) {
if (factor != ReplicationFactor.THREE) {
// Only put limits for Factor THREE pipelines.
@@ -110,20 +75,22 @@ public class RatisPipelineProvider implements PipelineProvider {
}
// Per datanode limit
if (maxPipelinePerDatanode > 0) {
- return (stateManager.getPipelines(ReplicationType.RATIS, factor).size() -
- stateManager.getPipelines(ReplicationType.RATIS, factor,
- Pipeline.PipelineState.CLOSED).size()) > maxPipelinePerDatanode *
- nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY) /
+ return (getPipelineStateManager().getPipelines(
+ ReplicationType.RATIS, factor).size() -
+ getPipelineStateManager().getPipelines(ReplicationType.RATIS, factor,
+ PipelineState.CLOSED).size()) > maxPipelinePerDatanode *
+ getNodeManager().getNodeCount(HddsProtos.NodeState.HEALTHY) /
factor.getNumber();
}
// Global limit
if (pipelineNumberLimit > 0) {
- return (stateManager.getPipelines(ReplicationType.RATIS,
- ReplicationFactor.THREE).size() - stateManager.getPipelines(
- ReplicationType.RATIS, ReplicationFactor.THREE,
- Pipeline.PipelineState.CLOSED).size()) >
- (pipelineNumberLimit - stateManager.getPipelines(
+ return (getPipelineStateManager().getPipelines(ReplicationType.RATIS,
+ ReplicationFactor.THREE).size() -
+ getPipelineStateManager().getPipelines(
+ ReplicationType.RATIS, ReplicationFactor.THREE,
+ PipelineState.CLOSED).size()) >
+ (pipelineNumberLimit - getPipelineStateManager().getPipelines(
ReplicationType.RATIS, ReplicationFactor.ONE).size());
}
@@ -143,7 +110,7 @@ public class RatisPipelineProvider implements PipelineProvider {
switch(factor) {
case ONE:
- dns = pickNodesNeverUsed(ReplicationFactor.ONE);
+ dns = pickNodesNeverUsed(ReplicationType.RATIS, ReplicationFactor.ONE);
break;
case THREE:
dns = placementPolicy.chooseDatanodes(null,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 7c58bec..26908b1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -237,6 +237,8 @@ public class SCMPipelineManager implements PipelineManager {
recordMetricsForPipeline(pipeline);
return pipeline;
} catch (IOException ex) {
+ LOG.error("Failed to create pipeline of type {} and factor {}. " +
+ "Exception: {}", type, factor, ex.getMessage());
metrics.incNumPipelineCreationFailed();
throw ex;
} finally {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index a772a97..c7b6305 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdds.scm.pipeline;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
@@ -32,18 +31,18 @@ import java.util.List;
/**
* Implements Api for creating stand alone pipelines.
*/
-public class SimplePipelineProvider implements PipelineProvider {
+public class SimplePipelineProvider extends PipelineProvider {
- private final NodeManager nodeManager;
-
- public SimplePipelineProvider(NodeManager nodeManager) {
- this.nodeManager = nodeManager;
+ public SimplePipelineProvider(NodeManager nodeManager,
+ PipelineStateManager stateManager) {
+ super(nodeManager, stateManager);
}
@Override
public Pipeline create(ReplicationFactor factor) throws IOException {
- List<DatanodeDetails> dns =
- nodeManager.getNodes(NodeState.HEALTHY);
+ List<DatanodeDetails> dns = pickNodesNeverUsed(ReplicationType.STAND_ALONE,
+ factor);
+
if (dns.size() < factor.getNumber()) {
String e = String
.format("Cannot create pipeline of factor %d using %d nodes.",
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
index d9a42f2..fe3fb79 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
@@ -44,7 +44,7 @@ public class TestSimplePipelineProvider {
public void init() throws Exception {
nodeManager = new MockNodeManager(true, 10);
stateManager = new PipelineStateManager();
- provider = new SimplePipelineProvider(nodeManager);
+ provider = new SimplePipelineProvider(nodeManager, stateManager);
}
@Test
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
index 1ab037a..2813301 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
@@ -39,7 +39,7 @@ public class ReconPipelineFactory extends PipelineFactory {
setProviders(new DefaultedMap(reconMockPipelineProvider));
}
- static class ReconPipelineProvider implements PipelineProvider {
+ static class ReconPipelineProvider extends PipelineProvider {
@Override
public Pipeline create(HddsProtos.ReplicationFactor factor){
@@ -63,7 +63,7 @@ public class ReconPipelineFactory extends PipelineFactory {
@Override
public void shutdown() {
-
+ // Do nothing
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org