You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/09/26 19:41:57 UTC
[ozone] branch master updated: HDDS-7039. EC: Handle the placement policy check in ECUnderReplicationHandler (#3645)
This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new c7038e08a8 HDDS-7039. EC: Handle the placement policy check in ECUnderReplicationHandler (#3645)
c7038e08a8 is described below
commit c7038e08a8df296108280f3759be55e7cbbf1d78
Author: Swaminathan Balachandran <47...@users.noreply.github.com>
AuthorDate: Mon Sep 26 12:41:51 2022 -0700
HDDS-7039. EC: Handle the placement policy check in ECUnderReplicationHandler (#3645)
---
.../hadoop/hdds/protocol/DatanodeDetails.java | 11 ++
.../replication/ECUnderReplicationHandler.java | 118 +++++++++++++--------
.../replication/TestECUnderReplicationHandler.java | 22 +++-
3 files changed, 103 insertions(+), 48 deletions(-)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index 25826f3e23..32358ef40a 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -234,6 +234,17 @@ public class DatanodeDetails extends NodeImpl implements
}
}
+ /**
+ * Checks if the OperationalState is Node is Decomissioned or Decomissioning.
+ * @return True if OperationalState is Decommissioned or Decomissioning.
+ */
+ public boolean isDecomissioned() {
+ return this.getPersistedOpState() ==
+ HddsProtos.NodeOperationalState.DECOMMISSIONED ||
+ this.getPersistedOpState() ==
+ HddsProtos.NodeOperationalState.DECOMMISSIONING;
+ }
+
/**
* Set the persistedOpState for this instance.
*
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 5feb1600ea..8c3f763741 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -74,6 +75,22 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
this.nodeManager = nodeManager;
}
+ private boolean validatePlacement(List<DatanodeDetails> replicaNodes,
+ List<DatanodeDetails> selectedNodes) {
+ List<DatanodeDetails> nodes = new ArrayList<>(replicaNodes);
+ nodes.addAll(selectedNodes);
+ boolean placementStatus = containerPlacement
+ .validateContainerPlacement(nodes, nodes.size())
+ .isPolicySatisfied();
+ if (!placementStatus) {
+ LOG.warn("Selected Nodes does not satisfy placement policy: {}. " +
+ "Selected nodes: {}. Existing Replica Nodes: {}.",
+ containerPlacement.getClass().getName(),
+ selectedNodes, replicaNodes);
+ }
+ return placementStatus;
+ }
+
/**
* Identify a new set of datanode(s) to reconstruct the container and form the
* SCM command to send it to DN. In the case of decommission, it will just
@@ -102,17 +119,14 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
final ECContainerReplicaCount replicaCount =
new ECContainerReplicaCount(container, replicas, pendingOps,
remainingMaintenanceRedundancy);
-
ContainerCheckRequest request = new ContainerCheckRequest.Builder()
.setContainerInfo(container)
.setContainerReplicas(replicas)
.setPendingOps(pendingOps)
.setMaintenanceRedundancy(remainingMaintenanceRedundancy)
.build();
-
ContainerHealthResult currentUnderRepRes = ecReplicationCheck
.checkHealth(request);
-
LOG.debug("Handling under-replicated EC container: {}", container);
if (currentUnderRepRes
.getHealthState() != ContainerHealthResult.HealthState
@@ -122,7 +136,6 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
container.getContainerID(), currentUnderRepRes);
return emptyMap();
}
-
// don't place reconstructed replicas on exclude nodes, since they already
// have replicas
List<DatanodeDetails> excludedNodes = replicas.stream()
@@ -153,37 +166,48 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
}
}
List<Integer> missingIndexes = replicaCount.unavailableIndexes(true);
+ Map<Integer, Pair<ContainerReplica, NodeStatus>> sources =
+ filterSources(replicas, deletionInFlight);
+ List<DatanodeDetails> nodes =
+ sources.values().stream().map(Pair::getLeft)
+ .map(ContainerReplica::getDatanodeDetails)
+ .filter(datanodeDetails ->
+ datanodeDetails.getPersistedOpState() ==
+ HddsProtos.NodeOperationalState.IN_SERVICE)
+ .filter(DatanodeDetails::isDecomissioned)
+ .collect(Collectors.toList());
// We got the missing indexes, this is excluded any decommissioning
// indexes. Find the good source nodes.
if (missingIndexes.size() > 0) {
- Map<Integer, Pair<ContainerReplica, NodeStatus>> sources =
- filterSources(replicas, deletionInFlight);
+
LOG.debug("Missing indexes detected for the container {}." +
" The missing indexes are {}", id, missingIndexes);
// We have source nodes.
if (sources.size() >= repConfig.getData()) {
final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
excludedNodes, container, missingIndexes.size());
- // any further processing shouldn't include these nodes as targets
- excludedNodes.addAll(selectedDatanodes);
- List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
- sourceDatanodesWithIndex = new ArrayList<>();
- for (Pair<ContainerReplica, NodeStatus> src : sources.values()) {
- sourceDatanodesWithIndex.add(
- new ReconstructECContainersCommand
- .DatanodeDetailsAndReplicaIndex(
- src.getLeft().getDatanodeDetails(),
- src.getLeft().getReplicaIndex()));
- }
+ if (validatePlacement(nodes, selectedDatanodes)) {
+ excludedNodes.addAll(selectedDatanodes);
+ nodes.addAll(selectedDatanodes);
+ List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
+ sourceDatanodesWithIndex = new ArrayList<>();
+ for (Pair<ContainerReplica, NodeStatus> src : sources.values()) {
+ sourceDatanodesWithIndex.add(
+ new ReconstructECContainersCommand
+ .DatanodeDetailsAndReplicaIndex(
+ src.getLeft().getDatanodeDetails(),
+ src.getLeft().getReplicaIndex()));
+ }
- final ReconstructECContainersCommand reconstructionCommand =
- new ReconstructECContainersCommand(id.getProtobuf().getId(),
- sourceDatanodesWithIndex, selectedDatanodes,
- int2byte(missingIndexes),
- repConfig);
- // Keeping the first target node as coordinator.
- commands.put(selectedDatanodes.get(0), reconstructionCommand);
+ final ReconstructECContainersCommand reconstructionCommand =
+ new ReconstructECContainersCommand(id.getProtobuf().getId(),
+ sourceDatanodesWithIndex, selectedDatanodes,
+ int2byte(missingIndexes),
+ repConfig);
+ // Keeping the first target node as coordinator.
+ commands.put(selectedDatanodes.get(0), reconstructionCommand);
+ }
} else {
LOG.warn("Cannot proceed for EC container reconstruction for {}, due"
+ " to insufficient source replicas found. Number of source "
@@ -192,37 +216,37 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
repConfig.getData(), sources.size(), sources);
}
}
-
Set<Integer> decomIndexes = replicaCount.decommissioningOnlyIndexes(true);
if (decomIndexes.size() > 0) {
final List<DatanodeDetails> selectedDatanodes =
getTargetDatanodes(excludedNodes, container, decomIndexes.size());
- excludedNodes.addAll(selectedDatanodes);
- Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
- // In this case we need to do one to one copy.
- for (ContainerReplica replica : replicas) {
- if (decomIndexes.contains(replica.getReplicaIndex())) {
- if (!iterator.hasNext()) {
- LOG.warn("Couldn't find enough targets. Available source"
- + " nodes: {}, the target nodes: {}, excluded nodes: {} and"
- + " the decommission indexes: {}",
- replicas, selectedDatanodes, excludedNodes, decomIndexes);
- break;
+ if (validatePlacement(nodes, selectedDatanodes)) {
+ excludedNodes.addAll(selectedDatanodes);
+ Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
+ // In this case we need to do one to one copy.
+ for (ContainerReplica replica : replicas) {
+ if (decomIndexes.contains(replica.getReplicaIndex())) {
+ if (!iterator.hasNext()) {
+ LOG.warn("Couldn't find enough targets. Available source"
+ + " nodes: {}, the target nodes: {}, excluded nodes: {} and"
+ + " the decommission indexes: {}",
+ replicas, selectedDatanodes, excludedNodes, decomIndexes);
+ break;
+ }
+ DatanodeDetails decommissioningSrcNode
+ = replica.getDatanodeDetails();
+ final ReplicateContainerCommand replicateCommand =
+ new ReplicateContainerCommand(id.getProtobuf().getId(),
+ ImmutableList.of(decommissioningSrcNode));
+ // For EC containers, we need to track the replica index which is
+ // to be replicated, so add it to the command.
+ replicateCommand.setReplicaIndex(replica.getReplicaIndex());
+ DatanodeDetails target = iterator.next();
+ commands.put(target, replicateCommand);
}
- DatanodeDetails decommissioningSrcNode
- = replica.getDatanodeDetails();
- final ReplicateContainerCommand replicateCommand =
- new ReplicateContainerCommand(id.getProtobuf().getId(),
- ImmutableList.of(decommissioningSrcNode));
- // For EC containers, we need to track the replica index which is
- // to be replicated, so add it to the command.
- replicateCommand.setReplicaIndex(replica.getReplicaIndex());
- DatanodeDetails target = iterator.next();
- commands.put(target, replicateCommand);
}
}
}
-
processMaintenanceOnlyIndexes(replicaCount, replicas, excludedNodes,
commands);
} catch (IOException | IllegalStateException ex) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 7c1c442c9e..0196e025a6 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
@@ -45,6 +46,7 @@ import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -85,7 +87,7 @@ public class TestECUnderReplicationHandler {
container = ReplicationTestUtil
.createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
policy = ReplicationTestUtil
- .getSimpleTestPlacementPolicy(nodeManager, conf);
+ .getSimpleTestPlacementPolicy(nodeManager, conf);
NodeSchema[] schemas =
new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA};
NodeSchemaManager.getInstance().init(schemas, true);
@@ -195,6 +197,24 @@ public class TestECUnderReplicationHandler {
availableReplicas, 1, 2, policy);
}
+ @Test
+ public void testUnderReplicationWithInvalidPlacement()
+ throws IOException {
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(DECOMMISSIONING, 1),
+ Pair.of(DECOMMISSIONING, 2), Pair.of(IN_SERVICE, 3),
+ Pair.of(IN_SERVICE, 4));
+ PlacementPolicy mockedPolicy = Mockito.spy(policy);
+ ContainerPlacementStatus mockedContainerPlacementStatus =
+ Mockito.mock(ContainerPlacementStatus.class);
+ Mockito.when(mockedContainerPlacementStatus.isPolicySatisfied())
+ .thenReturn(false);
+ Mockito.when(mockedPolicy.validateContainerPlacement(Mockito.anyList(),
+ Mockito.anyInt())).thenReturn(mockedContainerPlacementStatus);
+ testUnderReplicationWithMissingIndexes(Collections.emptyList(),
+ availableReplicas, 0, 0, mockedPolicy);
+ }
+
@Test
public void testExceptionIfNoNodesFound() {
PlacementPolicy noNodesPolicy = ReplicationTestUtil
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org