You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by av...@apache.org on 2021/05/03 17:44:18 UTC

[ozone] branch HDDS-3698-nonrolling-upgrade updated: HDDS-5170. Race condition in NodestateManager#addNode allows datanodes with lower MLV to be used in pipelines. (#2204)

This is an automated email from the ASF dual-hosted git repository.

avijayan pushed a commit to branch HDDS-3698-nonrolling-upgrade
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-3698-nonrolling-upgrade by this push:
     new cdb1471  HDDS-5170. Race condition in NodestateManager#addNode allows datanodes with lower MLV to be used in pipelines. (#2204)
cdb1471 is described below

commit cdb147144efca399087ffebb8c9f05a5684a56b4
Author: Ethan Rose <33...@users.noreply.github.com>
AuthorDate: Mon May 3 13:43:57 2021 -0400

    HDDS-5170. Race condition in NodestateManager#addNode allows datanodes with lower MLV to be used in pipelines. (#2204)
---
 .../hadoop/hdds/scm/node/NodeStateManager.java     |  22 ++--
 .../hadoop/hdds/scm/node/TestNodeStateManager.java |  28 ++---
 .../hadoop/hdds/scm/node/TestSCMNodeManager.java   | 117 ++++++++++++++-------
 3 files changed, 106 insertions(+), 61 deletions(-)

diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
index ec5b349..d875989 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeStateManager.java
@@ -294,18 +294,11 @@ public class NodeStateManager implements Runnable, Closeable {
    */
   public void addNode(DatanodeDetails datanodeDetails,
       LayoutVersionProto layoutInfo) throws NodeAlreadyExistsException {
-    NodeStatus newNodeStatus = newNodeStatus(datanodeDetails);
+    NodeStatus newNodeStatus = newNodeStatus(datanodeDetails, layoutInfo);
     nodeStateMap.addNode(datanodeDetails, newNodeStatus, layoutInfo);
     UUID dnID = datanodeDetails.getUuid();
     try {
       updateLastKnownLayoutVersion(datanodeDetails, layoutInfo);
-      DatanodeInfo dnInfo = nodeStateMap.getNodeInfo(dnID);
-      NodeStatus status = nodeStateMap.getNodeStatus(dnID);
-
-      // State machine starts nodes as HEALTHY. If there is a layout
-      // mismatch, this node should be moved to HEALTHY_READONLY.
-      updateNodeLayoutVersionState(dnInfo, layoutMisMatchCondition, status,
-          NodeLifeCycleEvent.LAYOUT_MISMATCH);
     } catch (NodeNotFoundException ex) {
       LOG.error("Inconsistent NodeStateMap! Datanode with ID {} was " +
           "added but not found in  map: {}", dnID, nodeStateMap);
@@ -320,17 +313,24 @@ public class NodeStateManager implements Runnable, Closeable {
    * updated to reflect the datanode state.
    * @param dn DatanodeDetails reported by the datanode
    */
-  private NodeStatus newNodeStatus(DatanodeDetails dn) {
+  private NodeStatus newNodeStatus(DatanodeDetails dn,
+      LayoutVersionProto layoutInfo) {
     HddsProtos.NodeOperationalState dnOpState = dn.getPersistedOpState();
+    NodeState state = HEALTHY;
+
+    if (layoutMisMatchCondition.test(layoutInfo)) {
+      state = HEALTHY_READONLY;
+    }
+
     if (dnOpState != NodeOperationalState.IN_SERVICE) {
       LOG.info("Updating nodeOperationalState on registration as the " +
               "datanode has a persisted state of {} and expiry of {}",
           dnOpState, dn.getPersistedOpStateExpiryEpochSec());
-      return new NodeStatus(dnOpState, nodeHealthSM.getInitialState(),
+      return new NodeStatus(dnOpState, state,
           dn.getPersistedOpStateExpiryEpochSec());
     } else {
       return new NodeStatus(
-          NodeOperationalState.IN_SERVICE, nodeHealthSM.getInitialState());
+          NodeOperationalState.IN_SERVICE, state);
     }
   }
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java
index 59abb2d..d5bfe86 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeStateManager.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.scm.node.states.NodeAlreadyExistsException;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.server.events.Event;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.container.upgrade.UpgradeUtils;
 import org.apache.hadoop.ozone.upgrade.LayoutVersionManager;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
@@ -95,11 +96,11 @@ public class TestNodeStateManager {
       throws NodeAlreadyExistsException, NodeNotFoundException {
     // Create a datanode, then add and retrieve it
     DatanodeDetails dn = generateDatanode();
-    nsm.addNode(dn, null);
+    nsm.addNode(dn, UpgradeUtils.defaultLayoutVersionProto());
     assertEquals(dn.getUuid(), nsm.getNode(dn).getUuid());
     // Now get the status of the newly added node and it should be
     // IN_SERVICE and HEALTHY
-    NodeStatus expectedState = NodeStatus.inServiceHealthyReadOnly();
+    NodeStatus expectedState = NodeStatus.inServiceHealthy();
     assertEquals(expectedState, nsm.getNodeStatus(dn));
   }
 
@@ -107,9 +108,9 @@ public class TestNodeStateManager {
   public void testGetAllNodesReturnsCorrectly()
       throws NodeAlreadyExistsException {
     DatanodeDetails dn = generateDatanode();
-    nsm.addNode(dn, null);
+    nsm.addNode(dn, UpgradeUtils.defaultLayoutVersionProto());
     dn = generateDatanode();
-    nsm.addNode(dn, null);
+    nsm.addNode(dn, UpgradeUtils.defaultLayoutVersionProto());
     assertEquals(2, nsm.getAllNodes().size());
     assertEquals(2, nsm.getTotalNodeCount());
   }
@@ -118,17 +119,17 @@ public class TestNodeStateManager {
   public void testGetNodeCountReturnsCorrectly()
       throws NodeAlreadyExistsException {
     DatanodeDetails dn = generateDatanode();
-    nsm.addNode(dn, null);
-    assertEquals(1, nsm.getNodes(NodeStatus.inServiceHealthyReadOnly()).size());
+    nsm.addNode(dn, UpgradeUtils.defaultLayoutVersionProto());
+    assertEquals(1, nsm.getNodes(NodeStatus.inServiceHealthy()).size());
     assertEquals(0, nsm.getNodes(NodeStatus.inServiceStale()).size());
   }
 
   @Test
   public void testGetNodeCount() throws NodeAlreadyExistsException {
     DatanodeDetails dn = generateDatanode();
-    nsm.addNode(dn, null);
+    nsm.addNode(dn, UpgradeUtils.defaultLayoutVersionProto());
     assertEquals(1, nsm.getNodeCount(
-        NodeStatus.inServiceHealthyReadOnly()));
+        NodeStatus.inServiceHealthy()));
     assertEquals(0, nsm.getNodeCount(NodeStatus.inServiceStale()));
   }
 
@@ -221,7 +222,7 @@ public class TestNodeStateManager {
   public void testNodeOpStateCanBeSet()
       throws NodeAlreadyExistsException, NodeNotFoundException {
     DatanodeDetails dn = generateDatanode();
-    nsm.addNode(dn, null);
+    nsm.addNode(dn, UpgradeUtils.defaultLayoutVersionProto());
 
     nsm.setNodeOperationalState(dn,
         HddsProtos.NodeOperationalState.DECOMMISSIONED);
@@ -229,25 +230,24 @@ public class TestNodeStateManager {
     NodeStatus newStatus = nsm.getNodeStatus(dn);
     assertEquals(HddsProtos.NodeOperationalState.DECOMMISSIONED,
         newStatus.getOperationalState());
-    assertEquals(NodeState.HEALTHY_READONLY,
-        newStatus.getHealth());
+    assertEquals(NodeState.HEALTHY, newStatus.getHealth());
   }
 
   @Test
   public void testHealthEventsFiredWhenOpStateChanged()
       throws NodeAlreadyExistsException, NodeNotFoundException {
     DatanodeDetails dn = generateDatanode();
-    nsm.addNode(dn, null);
+    nsm.addNode(dn, UpgradeUtils.defaultLayoutVersionProto());
 
     // First set the node to decommissioned, then run through all op states in
-    // order and ensure the non_healthy_to_healthy event gets fired
+    // order and ensure the healthy_to_healthy_readonly event gets fired
     nsm.setNodeOperationalState(dn,
         HddsProtos.NodeOperationalState.DECOMMISSIONED);
     for (HddsProtos.NodeOperationalState s :
         HddsProtos.NodeOperationalState.values()) {
       eventPublisher.clearEvents();
       nsm.setNodeOperationalState(dn, s);
-      assertEquals(SCMEvents.HEALTHY_READONLY_NODE,
+      assertEquals(SCMEvents.HEALTHY_READONLY_TO_HEALTHY_NODE,
           eventPublisher.getLastEvent());
     }
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 49b5dc9..946c765 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -21,7 +21,10 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ScheduledFuture;
@@ -40,6 +43,7 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
 import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto;
@@ -76,6 +80,7 @@ import org.junit.rules.ExpectedException;
 
 import java.util.Map;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
@@ -234,28 +239,36 @@ public class TestSCMNodeManager {
     try (SCMNodeManager nodeManager = createNodeManager(conf)) {
       // Register 2 nodes correctly.
       // These will be used with a faulty node to test pipeline creation.
-      TestUtils.createRandomDatanodeAndRegister(nodeManager);
-      TestUtils.createRandomDatanodeAndRegister(nodeManager);
+      DatanodeDetails goodNode1 =
+          TestUtils.createRandomDatanodeAndRegister(nodeManager);
+      DatanodeDetails goodNode2 =
+          TestUtils.createRandomDatanodeAndRegister(nodeManager);
 
       scm.exitSafeMode();
 
-      assertPipelineClosedAfterLayoutHeartbeat(nodeManager,
-          SMALLER_MLV_LAYOUT_PROTO);
-      assertPipelineClosedAfterLayoutHeartbeat(nodeManager,
-          LARGER_MLV_SLV_LAYOUT_PROTO);
-      assertPipelineClosedAfterLayoutHeartbeat(nodeManager,
-          SMALLER_MLV_SLV_LAYOUT_PROTO);
-      assertPipelineClosedAfterLayoutHeartbeat(nodeManager,
-          LARGER_SLV_LAYOUT_PROTO);
+      assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2,
+          nodeManager, SMALLER_MLV_LAYOUT_PROTO);
+      assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2,
+          nodeManager, LARGER_MLV_SLV_LAYOUT_PROTO);
+      assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2,
+          nodeManager, SMALLER_MLV_SLV_LAYOUT_PROTO);
+      assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2,
+          nodeManager, LARGER_SLV_LAYOUT_PROTO);
     }
   }
 
   private void assertPipelineClosedAfterLayoutHeartbeat(
+      DatanodeDetails originalNode1, DatanodeDetails originalNode2,
       SCMNodeManager nodeManager, LayoutVersionProto layout) throws Exception {
 
+    List<DatanodeDetails>  originalNodes =
+        Arrays.asList(originalNode1, originalNode2);
+
     // Initial condition: 2 healthy nodes registered.
-    assertPipelineCounts(oneCount -> oneCount == 2,
-        threeCount -> threeCount == 0);
+    assertPipelines(HddsProtos.ReplicationFactor.ONE, count -> count == 2,
+        originalNodes);
+    assertPipelines(HddsProtos.ReplicationFactor.THREE,
+        count -> count == 0, new ArrayList<>());
 
     // Even when safemode exit or new node addition trigger pipeline
     // creation, they will fail with not enough healthy nodes for ratis 3
@@ -267,17 +280,24 @@ public class TestSCMNodeManager {
     DatanodeDetails node = TestUtils
         .createRandomDatanodeAndRegister(nodeManager);
 
+    List<DatanodeDetails> allNodes = new ArrayList<>(originalNodes);
+    allNodes.add(node);
+
     // Safemode exit and adding the new node should trigger pipeline creation.
-    assertPipelineCounts(oneCount -> oneCount == 3,
-        threeCount -> threeCount >= 1);
+    assertPipelines(HddsProtos.ReplicationFactor.ONE, count -> count == 3,
+        allNodes);
+    assertPipelines(HddsProtos.ReplicationFactor.THREE, count -> count >= 1,
+        allNodes);
 
     // node sends incorrect layout.
     nodeManager.processHeartbeat(node, layout);
 
     // Its pipelines should be closed then removed, meaning there is not
     // enough nodes for factor 3 pipelines.
-    assertPipelineCounts(oneCount -> oneCount == 2,
-        threeCount -> threeCount == 0);
+    assertPipelines(HddsProtos.ReplicationFactor.ONE, count -> count == 2,
+        originalNodes);
+    assertPipelines(HddsProtos.ReplicationFactor.THREE,
+        count -> count == 0, new ArrayList<>());
 
     assertPipelineCreationFailsWithNotEnoughNodes(2);
   }
@@ -314,7 +334,8 @@ public class TestSCMNodeManager {
           SMALLER_MLV_LAYOUT_PROTO, success);
       // This node has correct MLV and SLV, so it can join and be used in
       // pipelines.
-      assertRegister(nodeManager, CORRECT_LAYOUT_PROTO, success);
+      DatanodeDetails goodNode = assertRegister(nodeManager,
+          CORRECT_LAYOUT_PROTO, success);
 
       Assert.assertEquals(3, nodeManager.getAllNodes().size());
 
@@ -322,8 +343,12 @@ public class TestSCMNodeManager {
 
       // SCM should auto create a factor 1 pipeline for the one healthy node.
       // Still should not have enough healthy nodes for ratis 3 pipeline.
-      assertPipelineCounts(oneCount -> oneCount == 1,
-          threeCount -> threeCount == 0);
+      assertPipelines(HddsProtos.ReplicationFactor.ONE,
+          count -> count == 1,
+          Collections.singletonList(goodNode));
+      assertPipelines(HddsProtos.ReplicationFactor.THREE,
+          count -> count == 0,
+          new ArrayList<>());
 
       // Even when safemode exit or new node addition trigger pipeline
       // creation, they will fail with not enough healthy nodes for ratis 3
@@ -337,8 +362,12 @@ public class TestSCMNodeManager {
 
       // After moving out of healthy readonly, pipeline creation should be
       // triggered.
-      assertPipelineCounts(oneCount -> oneCount == 3,
-          threeCount -> threeCount >= 1);
+      assertPipelines(HddsProtos.ReplicationFactor.ONE,
+          count -> count == 3,
+          Arrays.asList(badMlvNode1, badMlvNode2, goodNode));
+      assertPipelines(HddsProtos.ReplicationFactor.THREE,
+          count -> count >= 1,
+          Arrays.asList(badMlvNode1, badMlvNode2, goodNode));
     }
   }
 
@@ -365,21 +394,37 @@ public class TestSCMNodeManager {
     }
   }
 
-  private void assertPipelineCounts(Predicate<Integer> factorOneCheck,
-      Predicate<Integer> factorThreeCheck) throws Exception {
-    LambdaTestUtils.await(5000, 1000, () -> {
-      int numOne = scm.getPipelineManager()
-          .getPipelines(HddsProtos.ReplicationType.RATIS,
-              HddsProtos.ReplicationFactor.ONE).size();
-      int numThree = scm.getPipelineManager()
-          .getPipelines(HddsProtos.ReplicationType.RATIS,
-              HddsProtos.ReplicationFactor.THREE).size();
-
-      // Moving nodes out of healthy readonly should have triggered
-      // pipeline creation. With 3 healthy nodes, we should have at least one
-      // factor 3 pipeline now, and factor 1s should have been auto created
-      // for all nodes.
-      return factorOneCheck.test(numOne) && factorThreeCheck.test(numThree);
+  private void assertPipelines(HddsProtos.ReplicationFactor factor,
+      Predicate<Integer> countCheck, Collection<DatanodeDetails> allowedDNs)
+      throws Exception {
+
+    Set<String> allowedDnIds = allowedDNs.stream()
+        .map(DatanodeDetails::getUuidString)
+        .collect(Collectors.toSet());
+
+    LambdaTestUtils.await(10000, 1000, () -> {
+
+      // Make sure that none of these pipelines use nodes outside of allowedDNs.
+      List<Pipeline> pipelines = scm.getPipelineManager()
+          .getPipelines(HddsProtos.ReplicationType.RATIS, factor);
+
+      for (Pipeline pipeline: pipelines) {
+        for(DatanodeDetails pipelineDN: pipeline.getNodes()) {
+          // Do not wait for this condition to be true. Unhealthy DNs should
+          // never be used.
+          if (!allowedDnIds.contains(pipelineDN.getUuidString())) {
+            String message = String.format("Pipeline %s used datanode %s " +
+                "which is not in the set of allowed datanodes: %s",
+                pipeline.getId().toString(), pipelineDN.getUuidString(),
+                allowedDnIds.toString());
+
+            Assert.fail(message);
+          }
+        }
+      }
+
+      // Wait for the expected number of pipelines using allowed DNs.
+      return countCheck.test(pipelines.size());
     });
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org