You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by bh...@apache.org on 2020/04/17 04:18:40 UTC

[hadoop-ozone] branch master updated: HDDS-3322. StandAlone Pipelines are created in an infinite loop (#749)

This is an automated email from the ASF dual-hosted git repository.

bharat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f37a25  HDDS-3322. StandAlone Pipelines are created in an infinite loop (#749)
2f37a25 is described below

commit 2f37a25bf02518bb4484b608f803c6ce4a4656d4
Author: Hanisha Koneru <ha...@apache.org>
AuthorDate: Thu Apr 16 21:18:32 2020 -0700

    HDDS-3322. StandAlone Pipelines are created in an infinite loop (#749)
---
 .../scm/pipeline/BackgroundPipelineCreator.java    |  9 ++-
 .../hadoop/hdds/scm/pipeline/PipelineFactory.java  |  2 +-
 .../hadoop/hdds/scm/pipeline/PipelineProvider.java | 75 +++++++++++++++++++---
 .../hdds/scm/pipeline/RatisPipelineProvider.java   | 61 ++++--------------
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  2 +
 .../hdds/scm/pipeline/SimplePipelineProvider.java  | 15 ++---
 .../scm/pipeline/TestSimplePipelineProvider.java   |  2 +-
 .../ozone/recon/scm/ReconPipelineFactory.java      |  4 +-
 8 files changed, 101 insertions(+), 69 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
index b8f0fb6..001d185 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreator.java
@@ -91,8 +91,13 @@ class BackgroundPipelineCreator {
   private boolean skipCreation(HddsProtos.ReplicationFactor factor,
                                HddsProtos.ReplicationType type,
                                boolean autoCreate) {
-    return factor == HddsProtos.ReplicationFactor.ONE &&
-        type == HddsProtos.ReplicationType.RATIS && (!autoCreate);
+    if (type == HddsProtos.ReplicationType.RATIS) {
+      return factor == HddsProtos.ReplicationFactor.ONE && (!autoCreate);
+    } else {
+      // For STAND_ALONE Replication Type, Replication Factor 3 should not be
+      // used.
+      return factor == HddsProtos.ReplicationFactor.THREE;
+    }
   }
 
   private void createPipelines() {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
index 58a8fd7..9e5353a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineFactory.java
@@ -43,7 +43,7 @@ public class PipelineFactory {
       Configuration conf, EventPublisher eventPublisher) {
     providers = new HashMap<>();
     providers.put(ReplicationType.STAND_ALONE,
-        new SimplePipelineProvider(nodeManager));
+        new SimplePipelineProvider(nodeManager, stateManager));
     providers.put(ReplicationType.RATIS,
         new RatisPipelineProvider(nodeManager, stateManager, conf,
             eventPublisher));
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
index c00ff78..533f77e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineProvider.java
@@ -18,22 +18,81 @@
 
 package org.apache.hadoop.hdds.scm.pipeline;
 
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-
 import java.io.IOException;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
 
 /**
  * Interface for creating pipelines.
  */
-public interface PipelineProvider {
+public abstract class PipelineProvider {
+
+  private final NodeManager nodeManager;
+  private final PipelineStateManager stateManager;
+
+  public PipelineProvider(NodeManager nodeManager,
+      PipelineStateManager stateManager) {
+    this.nodeManager = nodeManager;
+    this.stateManager = stateManager;
+  }
+
+  public PipelineProvider() {
+    this.nodeManager = null;
+    this.stateManager = null;
+  }
+
+  public NodeManager getNodeManager() {
+    return nodeManager;
+  }
+
+  public PipelineStateManager getPipelineStateManager() {
+    return stateManager;
+  }
+
+  protected abstract Pipeline create(ReplicationFactor factor)
+      throws IOException;
+
+  protected abstract Pipeline create(ReplicationFactor factor,
+      List<DatanodeDetails> nodes);
 
-  Pipeline create(ReplicationFactor factor) throws IOException;
+  protected abstract void close(Pipeline pipeline) throws IOException;
 
-  Pipeline create(ReplicationFactor factor, List<DatanodeDetails> nodes);
+  protected abstract void shutdown();
 
-  void close(Pipeline pipeline) throws IOException;
+  List<DatanodeDetails> pickNodesNeverUsed(ReplicationType type,
+      ReplicationFactor factor) throws SCMException {
+    Set<DatanodeDetails> dnsUsed = new HashSet<>();
+    stateManager.getPipelines(type, factor).stream().filter(
+        p -> p.getPipelineState().equals(Pipeline.PipelineState.OPEN) ||
+            p.getPipelineState().equals(Pipeline.PipelineState.DORMANT) ||
+            p.getPipelineState().equals(Pipeline.PipelineState.ALLOCATED))
+        .forEach(p -> dnsUsed.addAll(p.getNodes()));
 
-  void shutdown();
+    // Get list of healthy nodes
+    List<DatanodeDetails> dns = nodeManager
+        .getNodes(HddsProtos.NodeState.HEALTHY)
+        .parallelStream()
+        .filter(dn -> !dnsUsed.contains(dn))
+        .limit(factor.getNumber())
+        .collect(Collectors.toList());
+    if (dns.size() < factor.getNumber()) {
+      String e = String
+          .format("Cannot create pipeline of factor %d using %d nodes." +
+                  " Used %d nodes. Healthy nodes %d", factor.getNumber(),
+              dns.size(), dnsUsed.size(),
+              nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size());
+      throw new SCMException(e,
+          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+    }
+    return dns;
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index ad100ab..9d7c996 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -36,21 +36,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 /**
  * Implements Api for creating ratis pipelines.
  */
-public class RatisPipelineProvider implements PipelineProvider {
+public class RatisPipelineProvider extends PipelineProvider {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(RatisPipelineProvider.class);
 
-  private final NodeManager nodeManager;
-  private final PipelineStateManager stateManager;
   private final Configuration conf;
   private final EventPublisher eventPublisher;
   private final PipelinePlacementPolicy placementPolicy;
@@ -60,8 +55,7 @@ public class RatisPipelineProvider implements PipelineProvider {
   RatisPipelineProvider(NodeManager nodeManager,
       PipelineStateManager stateManager, Configuration conf,
       EventPublisher eventPublisher) {
-    this.nodeManager = nodeManager;
-    this.stateManager = stateManager;
+    super(nodeManager, stateManager);
     this.conf = conf;
     this.eventPublisher = eventPublisher;
     this.placementPolicy =
@@ -74,35 +68,6 @@ public class RatisPipelineProvider implements PipelineProvider {
         ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT);
   }
 
-  private List<DatanodeDetails> pickNodesNeverUsed(ReplicationFactor factor)
-      throws SCMException {
-    Set<DatanodeDetails> dnsUsed = new HashSet<>();
-    stateManager.getPipelines(ReplicationType.RATIS, factor)
-        .stream().filter(
-          p -> p.getPipelineState().equals(PipelineState.OPEN) ||
-              p.getPipelineState().equals(PipelineState.DORMANT) ||
-              p.getPipelineState().equals(PipelineState.ALLOCATED))
-        .forEach(p -> dnsUsed.addAll(p.getNodes()));
-
-    // Get list of healthy nodes
-    List<DatanodeDetails> dns = nodeManager
-        .getNodes(HddsProtos.NodeState.HEALTHY)
-        .parallelStream()
-        .filter(dn -> !dnsUsed.contains(dn))
-        .limit(factor.getNumber())
-        .collect(Collectors.toList());
-    if (dns.size() < factor.getNumber()) {
-      String e = String
-          .format("Cannot create pipeline of factor %d using %d nodes." +
-                  " Used %d nodes. Healthy nodes %d", factor.getNumber(),
-              dns.size(), dnsUsed.size(),
-              nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size());
-      throw new SCMException(e,
-          SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
-    }
-    return dns;
-  }
-
   private boolean exceedPipelineNumberLimit(ReplicationFactor factor) {
     if (factor != ReplicationFactor.THREE) {
       // Only put limits for Factor THREE pipelines.
@@ -110,20 +75,22 @@ public class RatisPipelineProvider implements PipelineProvider {
     }
     // Per datanode limit
     if (maxPipelinePerDatanode > 0) {
-      return (stateManager.getPipelines(ReplicationType.RATIS, factor).size() -
-          stateManager.getPipelines(ReplicationType.RATIS, factor,
-              Pipeline.PipelineState.CLOSED).size()) > maxPipelinePerDatanode *
-          nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY) /
+      return (getPipelineStateManager().getPipelines(
+          ReplicationType.RATIS, factor).size() -
+          getPipelineStateManager().getPipelines(ReplicationType.RATIS, factor,
+              PipelineState.CLOSED).size()) > maxPipelinePerDatanode *
+          getNodeManager().getNodeCount(HddsProtos.NodeState.HEALTHY) /
           factor.getNumber();
     }
 
     // Global limit
     if (pipelineNumberLimit > 0) {
-      return (stateManager.getPipelines(ReplicationType.RATIS,
-          ReplicationFactor.THREE).size() - stateManager.getPipelines(
-          ReplicationType.RATIS, ReplicationFactor.THREE,
-          Pipeline.PipelineState.CLOSED).size()) >
-          (pipelineNumberLimit - stateManager.getPipelines(
+      return (getPipelineStateManager().getPipelines(ReplicationType.RATIS,
+          ReplicationFactor.THREE).size() -
+          getPipelineStateManager().getPipelines(
+              ReplicationType.RATIS, ReplicationFactor.THREE,
+              PipelineState.CLOSED).size()) >
+          (pipelineNumberLimit - getPipelineStateManager().getPipelines(
               ReplicationType.RATIS, ReplicationFactor.ONE).size());
     }
 
@@ -143,7 +110,7 @@ public class RatisPipelineProvider implements PipelineProvider {
 
     switch(factor) {
     case ONE:
-      dns = pickNodesNeverUsed(ReplicationFactor.ONE);
+      dns = pickNodesNeverUsed(ReplicationType.RATIS, ReplicationFactor.ONE);
       break;
     case THREE:
       dns = placementPolicy.chooseDatanodes(null,
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 7c58bec..26908b1 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -237,6 +237,8 @@ public class SCMPipelineManager implements PipelineManager {
       recordMetricsForPipeline(pipeline);
       return pipeline;
     } catch (IOException ex) {
+      LOG.error("Failed to create pipeline of type {} and factor {}. " +
+          "Exception: {}", type, factor, ex.getMessage());
       metrics.incNumPipelineCreationFailed();
       throw ex;
     } finally {
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
index a772a97..c7b6305 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SimplePipelineProvider.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState;
 
@@ -32,18 +31,18 @@ import java.util.List;
 /**
  * Implements Api for creating stand alone pipelines.
  */
-public class SimplePipelineProvider implements PipelineProvider {
+public class SimplePipelineProvider extends PipelineProvider {
 
-  private final NodeManager nodeManager;
-
-  public SimplePipelineProvider(NodeManager nodeManager) {
-    this.nodeManager = nodeManager;
+  public SimplePipelineProvider(NodeManager nodeManager,
+      PipelineStateManager stateManager) {
+    super(nodeManager, stateManager);
   }
 
   @Override
   public Pipeline create(ReplicationFactor factor) throws IOException {
-    List<DatanodeDetails> dns =
-        nodeManager.getNodes(NodeState.HEALTHY);
+    List<DatanodeDetails> dns = pickNodesNeverUsed(ReplicationType.STAND_ALONE,
+        factor);
+
     if (dns.size() < factor.getNumber()) {
       String e = String
           .format("Cannot create pipeline of factor %d using %d nodes.",
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
index d9a42f2..fe3fb79 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSimplePipelineProvider.java
@@ -44,7 +44,7 @@ public class TestSimplePipelineProvider {
   public void init() throws Exception {
     nodeManager = new MockNodeManager(true, 10);
     stateManager = new PipelineStateManager();
-    provider = new SimplePipelineProvider(nodeManager);
+    provider = new SimplePipelineProvider(nodeManager, stateManager);
   }
 
   @Test
diff --git a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
index 1ab037a..2813301 100644
--- a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
+++ b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineFactory.java
@@ -39,7 +39,7 @@ public class ReconPipelineFactory extends PipelineFactory {
     setProviders(new DefaultedMap(reconMockPipelineProvider));
   }
 
-  static class ReconPipelineProvider implements PipelineProvider {
+  static class ReconPipelineProvider extends PipelineProvider {
 
     @Override
     public Pipeline create(HddsProtos.ReplicationFactor factor){
@@ -63,7 +63,7 @@ public class ReconPipelineFactory extends PipelineFactory {
 
     @Override
     public void shutdown() {
-
+      // Do nothing
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org