You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/09/26 19:41:57 UTC

[ozone] branch master updated: HDDS-7039. EC: Handle the placement policy check in ECUnderReplicationHandler (#3645)

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new c7038e08a8 HDDS-7039. EC: Handle the placement policy check in ECUnderReplicationHandler (#3645)
c7038e08a8 is described below

commit c7038e08a8df296108280f3759be55e7cbbf1d78
Author: Swaminathan Balachandran <47...@users.noreply.github.com>
AuthorDate: Mon Sep 26 12:41:51 2022 -0700

    HDDS-7039. EC: Handle the placement policy check in ECUnderReplicationHandler (#3645)
---
 .../hadoop/hdds/protocol/DatanodeDetails.java      |  11 ++
 .../replication/ECUnderReplicationHandler.java     | 118 +++++++++++++--------
 .../replication/TestECUnderReplicationHandler.java |  22 +++-
 3 files changed, 103 insertions(+), 48 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index 25826f3e23..32358ef40a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -234,6 +234,17 @@ public class DatanodeDetails extends NodeImpl implements
     }
   }
 
+  /**
+   * Checks if the OperationalState is Node is Decomissioned or Decomissioning.
+   * @return True if OperationalState is Decommissioned or Decomissioning.
+   */
+  public boolean isDecomissioned() {
+    return this.getPersistedOpState() ==
+            HddsProtos.NodeOperationalState.DECOMMISSIONED ||
+            this.getPersistedOpState() ==
+            HddsProtos.NodeOperationalState.DECOMMISSIONING;
+  }
+
   /**
    * Set the persistedOpState for this instance.
    *
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 5feb1600ea..8c3f763741 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -74,6 +75,22 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
     this.nodeManager = nodeManager;
   }
 
+  private boolean validatePlacement(List<DatanodeDetails> replicaNodes,
+                                    List<DatanodeDetails> selectedNodes) {
+    List<DatanodeDetails> nodes = new ArrayList<>(replicaNodes);
+    nodes.addAll(selectedNodes);
+    boolean placementStatus = containerPlacement
+            .validateContainerPlacement(nodes, nodes.size())
+            .isPolicySatisfied();
+    if (!placementStatus) {
+      LOG.warn("Selected Nodes does not satisfy placement policy: {}. " +
+              "Selected nodes: {}. Existing Replica Nodes: {}.",
+              containerPlacement.getClass().getName(),
+              selectedNodes, replicaNodes);
+    }
+    return placementStatus;
+  }
+
   /**
    * Identify a new set of datanode(s) to reconstruct the container and form the
    * SCM command to send it to DN. In the case of decommission, it will just
@@ -102,17 +119,14 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
     final ECContainerReplicaCount replicaCount =
         new ECContainerReplicaCount(container, replicas, pendingOps,
             remainingMaintenanceRedundancy);
-
     ContainerCheckRequest request = new ContainerCheckRequest.Builder()
         .setContainerInfo(container)
         .setContainerReplicas(replicas)
         .setPendingOps(pendingOps)
         .setMaintenanceRedundancy(remainingMaintenanceRedundancy)
         .build();
-
     ContainerHealthResult currentUnderRepRes = ecReplicationCheck
         .checkHealth(request);
-
     LOG.debug("Handling under-replicated EC container: {}", container);
     if (currentUnderRepRes
         .getHealthState() != ContainerHealthResult.HealthState
@@ -122,7 +136,6 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
           container.getContainerID(), currentUnderRepRes);
       return emptyMap();
     }
-
     // don't place reconstructed replicas on exclude nodes, since they already
     // have replicas
     List<DatanodeDetails> excludedNodes = replicas.stream()
@@ -153,37 +166,48 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
         }
       }
       List<Integer> missingIndexes = replicaCount.unavailableIndexes(true);
+      Map<Integer, Pair<ContainerReplica, NodeStatus>> sources =
+              filterSources(replicas, deletionInFlight);
+      List<DatanodeDetails> nodes =
+              sources.values().stream().map(Pair::getLeft)
+                      .map(ContainerReplica::getDatanodeDetails)
+                      .filter(datanodeDetails ->
+                              datanodeDetails.getPersistedOpState() ==
+                              HddsProtos.NodeOperationalState.IN_SERVICE)
+                      .filter(DatanodeDetails::isDecomissioned)
+                      .collect(Collectors.toList());
       // We got the missing indexes, this is excluded any decommissioning
       // indexes. Find the good source nodes.
       if (missingIndexes.size() > 0) {
-        Map<Integer, Pair<ContainerReplica, NodeStatus>> sources =
-            filterSources(replicas, deletionInFlight);
+
         LOG.debug("Missing indexes detected for the container {}." +
                 " The missing indexes are {}", id, missingIndexes);
         // We have source nodes.
         if (sources.size() >= repConfig.getData()) {
           final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
               excludedNodes, container, missingIndexes.size());
-          // any further processing shouldn't include these nodes as targets
-          excludedNodes.addAll(selectedDatanodes);
 
-          List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
-              sourceDatanodesWithIndex = new ArrayList<>();
-          for (Pair<ContainerReplica, NodeStatus> src : sources.values()) {
-            sourceDatanodesWithIndex.add(
-                new ReconstructECContainersCommand
-                    .DatanodeDetailsAndReplicaIndex(
-                    src.getLeft().getDatanodeDetails(),
-                    src.getLeft().getReplicaIndex()));
-          }
+          if (validatePlacement(nodes, selectedDatanodes)) {
+            excludedNodes.addAll(selectedDatanodes);
+            nodes.addAll(selectedDatanodes);
+            List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
+                    sourceDatanodesWithIndex = new ArrayList<>();
+            for (Pair<ContainerReplica, NodeStatus> src : sources.values()) {
+              sourceDatanodesWithIndex.add(
+                      new ReconstructECContainersCommand
+                              .DatanodeDetailsAndReplicaIndex(
+                              src.getLeft().getDatanodeDetails(),
+                              src.getLeft().getReplicaIndex()));
+            }
 
-          final ReconstructECContainersCommand reconstructionCommand =
-              new ReconstructECContainersCommand(id.getProtobuf().getId(),
-                  sourceDatanodesWithIndex, selectedDatanodes,
-                  int2byte(missingIndexes),
-                  repConfig);
-          // Keeping the first target node as coordinator.
-          commands.put(selectedDatanodes.get(0), reconstructionCommand);
+            final ReconstructECContainersCommand reconstructionCommand =
+                    new ReconstructECContainersCommand(id.getProtobuf().getId(),
+                            sourceDatanodesWithIndex, selectedDatanodes,
+                            int2byte(missingIndexes),
+                            repConfig);
+            // Keeping the first target node as coordinator.
+            commands.put(selectedDatanodes.get(0), reconstructionCommand);
+          }
         } else {
           LOG.warn("Cannot proceed for EC container reconstruction for {}, due"
               + " to insufficient source replicas found. Number of source "
@@ -192,37 +216,37 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
               repConfig.getData(), sources.size(), sources);
         }
       }
-
       Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
       if (decomIndexes.size() > 0) {
         final List<DatanodeDetails> selectedDatanodes =
             getTargetDatanodes(excludedNodes, container, decomIndexes.size());
-        excludedNodes.addAll(selectedDatanodes);
-        Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
-        // In this case we need to do one to one copy.
-        for (ContainerReplica replica : replicas) {
-          if (decomIndexes.contains(replica.getReplicaIndex())) {
-            if (!iterator.hasNext()) {
-              LOG.warn("Couldn't find enough targets. Available source"
-                  + " nodes: {}, the target nodes: {}, excluded nodes: {} and"
-                  + "  the decommission indexes: {}",
-                  replicas, selectedDatanodes, excludedNodes, decomIndexes);
-              break;
+        if (validatePlacement(nodes, selectedDatanodes)) {
+          excludedNodes.addAll(selectedDatanodes);
+          Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
+          // In this case we need to do one to one copy.
+          for (ContainerReplica replica : replicas) {
+            if (decomIndexes.contains(replica.getReplicaIndex())) {
+              if (!iterator.hasNext()) {
+                LOG.warn("Couldn't find enough targets. Available source"
+                    + " nodes: {}, the target nodes: {}, excluded nodes: {} and"
+                    + "  the decommission indexes: {}",
+                    replicas, selectedDatanodes, excludedNodes, decomIndexes);
+                break;
+              }
+              DatanodeDetails decommissioningSrcNode
+                  = replica.getDatanodeDetails();
+              final ReplicateContainerCommand replicateCommand =
+                  new ReplicateContainerCommand(id.getProtobuf().getId(),
+                      ImmutableList.of(decommissioningSrcNode));
+              // For EC containers, we need to track the replica index which is
+              // to be replicated, so add it to the command.
+              replicateCommand.setReplicaIndex(replica.getReplicaIndex());
+              DatanodeDetails target = iterator.next();
+              commands.put(target, replicateCommand);
             }
-            DatanodeDetails decommissioningSrcNode
-                = replica.getDatanodeDetails();
-            final ReplicateContainerCommand replicateCommand =
-                new ReplicateContainerCommand(id.getProtobuf().getId(),
-                    ImmutableList.of(decommissioningSrcNode));
-            // For EC containers, we need to track the replica index which is
-            // to be replicated, so add it to the command.
-            replicateCommand.setReplicaIndex(replica.getReplicaIndex());
-            DatanodeDetails target = iterator.next();
-            commands.put(target, replicateCommand);
           }
         }
       }
-
       processMaintenanceOnlyIndexes(replicaCount, replicas, excludedNodes,
           commands);
     } catch (IOException | IllegalStateException ex) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 7c1c442c9e..0196e025a6 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
@@ -45,6 +46,7 @@ import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -85,7 +87,7 @@ public class TestECUnderReplicationHandler {
     container = ReplicationTestUtil
         .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
     policy = ReplicationTestUtil
-        .getSimpleTestPlacementPolicy(nodeManager, conf);
+            .getSimpleTestPlacementPolicy(nodeManager, conf);
     NodeSchema[] schemas =
         new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
     NodeSchemaManager.getInstance().init(schemas, true);
@@ -195,6 +197,24 @@ public class TestECUnderReplicationHandler {
         availableReplicas, 1, 2, policy);
   }
 
+  @Test
+  public void testUnderReplicationWithInvalidPlacement()
+          throws IOException {
+    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+            .createReplicas(Pair.of(DECOMMISSIONING, 1),
+                    Pair.of(DECOMMISSIONING, 2), Pair.of(IN_SERVICE, 3),
+                    Pair.of(IN_SERVICE, 4));
+    PlacementPolicy mockedPolicy = Mockito.spy(policy);
+    ContainerPlacementStatus mockedContainerPlacementStatus =
+            Mockito.mock(ContainerPlacementStatus.class);
+    Mockito.when(mockedContainerPlacementStatus.isPolicySatisfied())
+            .thenReturn(false);
+    Mockito.when(mockedPolicy.validateContainerPlacement(Mockito.anyList(),
+            Mockito.anyInt())).thenReturn(mockedContainerPlacementStatus);
+    testUnderReplicationWithMissingIndexes(Collections.emptyList(),
+            availableReplicas, 0, 0, mockedPolicy);
+  }
+
   @Test
   public void testExceptionIfNoNodesFound() {
     PlacementPolicy noNodesPolicy = ReplicationTestUtil


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org