You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2023/04/18 08:51:00 UTC
[ozone] branch master updated: HDDS-8333. ReplicationManager: Allow partial EC reconstruction if insufficient nodes available (#4579)
This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new b74faeff91 HDDS-8333. ReplicationManager: Allow partial EC reconstruction if insufficient nodes available (#4579)
b74faeff91 is described below
commit b74faeff9105f42dfd98184c5b76bc63db70e2c2
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Tue Apr 18 09:50:53 2023 +0100
HDDS-8333. ReplicationManager: Allow partial EC reconstruction if insufficient nodes available (#4579)
---
.../replication/ECUnderReplicationHandler.java | 155 ++++++++++++---------
.../replication/MisReplicationHandler.java | 26 +---
.../replication/RatisUnderReplicationHandler.java | 26 +---
.../replication/ReplicationManagerUtil.java | 93 +++++++++++++
.../replication/TestECUnderReplicationHandler.java | 114 ++++++++++++---
5 files changed, 285 insertions(+), 129 deletions(-)
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index af57775fd6..f07b1c5a6f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException;
import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
@@ -62,12 +63,6 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
private final long currentContainerSize;
private final ReplicationManager replicationManager;
- private static class CannotFindTargetsException extends IOException {
- CannotFindTargetsException(Throwable cause) {
- super(cause);
- }
- }
-
public ECUnderReplicationHandler(final PlacementPolicy containerPlacement,
final ConfigurationSource conf, ReplicationManager replicationManager) {
this.containerPlacement = containerPlacement;
@@ -164,49 +159,60 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
.collect(Collectors.toList());
try {
- commandsSent += processMissingIndexes(replicaCount, sources,
- availableSourceNodes, excludedNodes);
- commandsSent += processDecommissioningIndexes(replicaCount, sources,
- availableSourceNodes, excludedNodes);
- commandsSent += processMaintenanceOnlyIndexes(replicaCount, sources,
- excludedNodes);
- // TODO - we should be able to catch SCMException here and check the
- // result code but the RackAware topology never sets the code.
- } catch (CannotFindTargetsException e) {
- // If we get here, we tried to find nodes to fix the under replication
- // issues, but were not able to find any at some stage, and the
- // placement policy threw an exception.
- // At this stage. If the cluster is small and there are some
- // over replicated indexes, it could stop us finding a new node as there
- // are no more nodes left to try.
- // If the container is also over replicated, then hand it off to the
- // over-rep handler, and after those over-rep indexes are cleared the
- // under replication can be re-tried in the next iteration of RM.
- // However, we should only hand off to the over rep handler if there are
- // no commands already created. If we have some commands, they may
- // attempt to use sources the over-rep handler would remove. So we
- // should let the commands we have created be processed, and then the
- // container will be re-processed in a further RM pass.
- LOG.debug("Unable to located new target nodes for container {}",
- container, e);
- if (commandsSent > 0) {
- LOG.debug("Some commands have already been created, so returning " +
- "with them only");
- return commandsSent;
+ InsufficientDatanodesException firstException = null;
+ try {
+ commandsSent += processMissingIndexes(replicaCount, sources,
+ availableSourceNodes, excludedNodes);
+ } catch (InsufficientDatanodesException e) {
+ firstException = e;
+ }
+ try {
+ commandsSent += processDecommissioningIndexes(replicaCount, sources,
+ availableSourceNodes, excludedNodes);
+ } catch (InsufficientDatanodesException e) {
+ if (firstException == null) {
+ firstException = e;
+ }
}
+ try {
+ commandsSent += processMaintenanceOnlyIndexes(replicaCount, sources,
+ excludedNodes);
+ } catch (InsufficientDatanodesException e) {
+ if (firstException == null) {
+ firstException = e;
+ }
+ }
+ if (firstException != null) {
+ // We had partial success through some of the steps, so just throw the
+ // first exception we got. This will cause the container to be
+ // re-queued and try again later.
+ throw firstException;
+ }
+ } catch (SCMException e) {
+ SCMException.ResultCodes code = e.getResult();
+ if (code != SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE) {
+ throw e;
+ }
+ // If we get here, we got an exception indicating the placement policy
+ // was not able to find ANY nodes to satisfy the replication at one of
+ // the processing stages (missing index, decom or maint). It is
+ // possible that some commands were sent to partially fix the
+ // replication, but a further run will be needed to fix the rest.
+ // On a small cluster, it is possible that over replication could stop
+ // nodes getting selected, so to check if that is the case, we run
+ // the over rep handler, which may free some nodes for the next run.
if (replicaCount.isOverReplicated()) {
LOG.debug("Container {} is both under and over replicated. Cannot " +
"find enough target nodes, so handing off to the " +
"OverReplication handler", container);
- return replicationManager.processOverReplicatedContainer(result);
- } else {
- throw (SCMException)e.getCause();
+ replicationManager.processOverReplicatedContainer(result);
}
+ // As we want to re-queue and try again later, we just re-throw
+ throw e;
}
} catch (IOException | IllegalStateException ex) {
- LOG.warn("Exception while processing for creating the EC reconstruction" +
- " container commands for container {}.",
- id, ex);
+ LOG.warn("Exception while creating the replication or" +
+ " reconstruction commands for container {}.", id, ex);
throw ex;
}
if (commandsSent == 0) {
@@ -253,25 +259,6 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
}));
}
- private List<DatanodeDetails> getTargetDatanodes(
- List<DatanodeDetails> excludedNodes, ContainerInfo container,
- int requiredNodes) throws IOException {
- // We should ensure that the target datanode has enough space
- // for a complete container to be created, but since the container
- // size may be changed smaller than origin, we should be defensive.
- final long dataSizeRequired =
- Math.max(container.getUsedBytes(), currentContainerSize);
- try {
- return containerPlacement
- .chooseDatanodes(excludedNodes, null, requiredNodes, 0,
- dataSizeRequired);
- } catch (SCMException e) {
- // SCMException can come from many places in SCM, so catch it here and
- // throw a more specific exception instead.
- throw new CannotFindTargetsException(e);
- }
- }
-
/**
* Processes replicas that are in maintenance nodes and should need
* additional copies.
@@ -293,9 +280,19 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
int commandsSent = 0;
if (sources.size() >= repConfig.getData()) {
- final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
- excludedNodes, container, missingIndexes.size());
+ int expectedTargets = missingIndexes.size();
+ final List<DatanodeDetails> selectedDatanodes =
+ ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
+ expectedTargets, null, excludedNodes, currentContainerSize,
+ container);
+ // If we got less targets than missing indexes, we need to prune the
+ // missing index list so it only tries to recover the nummber of indexes
+ // we have targets for.
+ if (selectedDatanodes.size() < expectedTargets) {
+ missingIndexes.subList(selectedDatanodes.size(),
+ missingIndexes.size()).clear();
+ }
if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
excludedNodes.addAll(selectedDatanodes);
// TODO - what are we adding all the selected nodes to available
@@ -324,6 +321,14 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
}
commandsSent++;
}
+ if (selectedDatanodes.size() != expectedTargets) {
+ LOG.debug("Insufficient nodes were returned from the placement policy" +
+ " to fully reconstruct container {}. Requested {} received {}",
+ container.getContainerID(), expectedTargets,
+ selectedDatanodes.size());
+ throw new InsufficientDatanodesException(missingIndexes.size(),
+ selectedDatanodes.size());
+ }
} else {
LOG.warn("Cannot proceed for EC container reconstruction for {}, due"
+ " to insufficient source replicas found. Number of source "
@@ -350,7 +355,10 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
int commandsSent = 0;
if (decomIndexes.size() > 0) {
final List<DatanodeDetails> selectedDatanodes =
- getTargetDatanodes(excludedNodes, container, decomIndexes.size());
+ ReplicationManagerUtil.getTargetDatanodes(containerPlacement,
+ decomIndexes.size(), null, excludedNodes, currentContainerSize,
+ container);
+
if (validatePlacement(availableSourceNodes, selectedDatanodes)) {
excludedNodes.addAll(selectedDatanodes);
Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
@@ -377,6 +385,14 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
commandsSent++;
}
}
+ if (selectedDatanodes.size() != decomIndexes.size()) {
+ LOG.debug("Insufficient nodes were returned from the placement policy" +
+ " to fully replicate the decommission indexes for container {}." +
+ " Requested {} received {}", container.getContainerID(),
+ decomIndexes.size(), selectedDatanodes.size());
+ throw new InsufficientDatanodesException(decomIndexes.size(),
+ selectedDatanodes.size());
+ }
}
return commandsSent;
}
@@ -407,8 +423,9 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
if (additionalMaintenanceCopiesNeeded == 0) {
return 0;
}
- List<DatanodeDetails> targets = getTargetDatanodes(excludedNodes, container,
- additionalMaintenanceCopiesNeeded);
+ List<DatanodeDetails> targets = ReplicationManagerUtil.getTargetDatanodes(
+ containerPlacement, maintIndexes.size(), null, excludedNodes,
+ currentContainerSize, container);
excludedNodes.addAll(targets);
Iterator<DatanodeDetails> iterator = targets.iterator();
@@ -439,6 +456,14 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
commandsSent++;
additionalMaintenanceCopiesNeeded -= 1;
}
+ if (targets.size() != maintIndexes.size()) {
+ LOG.debug("Insufficient nodes were returned from the placement policy" +
+ " to fully replicate the maintenance indexes for container {}." +
+ " Requested {} received {}", container.getContainerID(),
+ maintIndexes.size(), targets.size());
+ throw new InsufficientDatanodesException(maintIndexes.size(),
+ targets.size());
+ }
return commandsSent;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
index abf7afe309..4ab4549556 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/MisReplicationHandler.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
@@ -76,25 +75,6 @@ public abstract class MisReplicationHandler implements
List<ContainerReplicaOp> pendingOps, int remainingMaintenanceRedundancy)
throws IOException;
- private List<DatanodeDetails> getTargetDatanodes(
- List<DatanodeDetails> usedNodes, List<DatanodeDetails> excludedNodes,
- ContainerInfo container, int requiredNodes) throws IOException {
- final long dataSizeRequired =
- Math.max(container.getUsedBytes(), currentContainerSize);
- while (requiredNodes > 0) {
- try {
- return containerPlacement.chooseDatanodes(usedNodes, excludedNodes,
- null, requiredNodes, 0, dataSizeRequired);
- } catch (IOException e) {
- requiredNodes -= 1;
- }
- }
- throw new SCMException(String.format("Placement Policy: %s did not return"
- + " any nodes. Number of required Nodes %d, Datasize Required: %d",
- containerPlacement.getClass(), requiredNodes, dataSizeRequired),
- SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
- }
-
private Set<ContainerReplica> filterSources(Set<ContainerReplica> replicas) {
return replicas.stream()
.filter(r -> r.getState() == StorageContainerDatanodeProtocolProtos
@@ -196,8 +176,10 @@ public abstract class MisReplicationHandler implements
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
int requiredNodes = replicasToBeReplicated.size();
- List<DatanodeDetails> targetDatanodes = getTargetDatanodes(usedDns,
- excludedDns, container, requiredNodes);
+
+ List<DatanodeDetails> targetDatanodes = ReplicationManagerUtil
+ .getTargetDatanodes(containerPlacement, requiredNodes, usedDns,
+ excludedDns, currentContainerSize, container);
int count = sendReplicateCommands(container, replicasToBeReplicated,
targetDatanodes);
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
index 3f82bbb0eb..cac0be0ee3 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisUnderReplicationHandler.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
@@ -241,28 +240,9 @@ public class RatisUnderReplicationHandler
.collect(Collectors.toList());
excludeList.addAll(pendingReplication);
- /*
- Ensure that target datanodes have enough space to hold a complete
- container.
- */
- final long dataSizeRequired =
- Math.max(replicaCount.getContainer().getUsedBytes(),
- currentContainerSize);
- int requiredNodes = replicaCount.additionalReplicaNeeded();
- while (requiredNodes > 0) {
- try {
- return placementPolicy.chooseDatanodes(excludeList, null,
- requiredNodes, 0, dataSizeRequired);
- } catch (IOException e) {
- LOG.debug("Placement policy was not able to return {} nodes. ",
- requiredNodes, e);
- requiredNodes--;
- }
- }
- throw new SCMException(String.format("Placement Policy: %s did not return"
- + " any nodes. Number of required Nodes %d, Datasize Required: %d",
- placementPolicy.getClass(), requiredNodes, dataSizeRequired),
- SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ return ReplicationManagerUtil.getTargetDatanodes(placementPolicy,
+ replicaCount.additionalReplicaNeeded(), null, excludeList,
+ currentContainerSize, replicaCount.getContainer());
}
private int sendReplicationCommands(
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
new file mode 100644
index 0000000000..12b1eecf61
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManagerUtil.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Utility class for ReplicationManager.
+ */
+public final class ReplicationManagerUtil {
+
+ private ReplicationManagerUtil() {
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ ReplicationManagerUtil.class);
+
+ /**
+ * Using the passed placement policy attempt to select a list of datanodes to
+ * use as new targets. If the placement policy is unable to select enough
+ * nodes, the number of nodes requested will be reduced by 1 and the placement
+ * policy will be called again. This will continue until the placement policy
+ * is able to select enough nodes or the number of nodes requested is reduced
+ * to zero when an exception will be thrown.
+ * @param policy The placement policy to use to select nodes.
+ * @param requiredNodes The number of nodes required
+ * @param usedNodes Any nodes already used by the container
+ * @param excludedNodes Any Excluded nodes which cannot be selected
+ * @param defaultContainerSize The cluster default max container size
+ * @param container The container to select new replicas for
+ * @return A list of up to requiredNodes datanodes to use as targets for new
+ * replicas. Note the number of nodes returned may be less than the
+ * number of nodes requested if the placement policy is unable to
+ * return enough nodes.
+ * @throws SCMException If no nodes can be selected.
+ */
+ public static List<DatanodeDetails> getTargetDatanodes(PlacementPolicy policy,
+ int requiredNodes, List<DatanodeDetails> usedNodes,
+ List<DatanodeDetails> excludedNodes, long defaultContainerSize,
+ ContainerInfo container) throws SCMException {
+
+ // Ensure that target datanodes have enough space to hold a complete
+ // container.
+ final long dataSizeRequired =
+ Math.max(container.getUsedBytes(), defaultContainerSize);
+
+ int mutableRequiredNodes = requiredNodes;
+ while (mutableRequiredNodes > 0) {
+ try {
+ if (usedNodes == null) {
+ return policy.chooseDatanodes(excludedNodes, null,
+ mutableRequiredNodes, 0, dataSizeRequired);
+ } else {
+ return policy.chooseDatanodes(usedNodes, excludedNodes, null,
+ mutableRequiredNodes, 0, dataSizeRequired);
+ }
+ } catch (IOException e) {
+ LOG.debug("Placement policy was not able to return {} nodes for " +
+ "container {}.",
+ mutableRequiredNodes, container.getContainerID(), e);
+ mutableRequiredNodes--;
+ }
+ }
+ throw new SCMException(String.format("Placement Policy: %s did not return"
+ + " any nodes. Number of required Nodes %d, Datasize Required: %d",
+ policy.getClass(), requiredNodes, dataSizeRequired),
+ SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE);
+ }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index b42add9ecf..09f10bdf5f 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
+import org.apache.hadoop.hdds.scm.pipeline.InsufficientDatanodesException;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
@@ -461,14 +462,87 @@ public class TestECUnderReplicationHandler {
return expectedDelete.size();
});
commandsSent.clear();
- ecURH.processAndSendCommands(availableReplicas,
- Collections.emptyList(), underRep, 2);
+ assertThrows(SCMException.class,
+ () -> ecURH.processAndSendCommands(availableReplicas,
+ Collections.emptyList(), underRep, 2));
Mockito.verify(replicationManager, times(1))
.processOverReplicatedContainer(underRep);
Assertions.assertEquals(true, expectedDelete.equals(commandsSent));
}
}
+ @Test
+ public void testPartialReconstructionIfNotEnoughNodes() {
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(IN_SERVICE, 1),
+ Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3));
+ PlacementPolicy placementPolicy = ReplicationTestUtil
+ .getInsufficientNodesTestPlacementPolicy(nodeManager, conf, 2);
+ ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
+ placementPolicy, conf, replicationManager);
+
+ ContainerHealthResult.UnderReplicatedHealthResult underRep =
+ new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ 0, false, false, false);
+
+ assertThrows(InsufficientDatanodesException.class, () ->
+ ecURH.processAndSendCommands(availableReplicas, Collections.emptyList(),
+ underRep, 1));
+ Assertions.assertEquals(1, commandsSent.size());
+ ReconstructECContainersCommand cmd = (ReconstructECContainersCommand)
+ commandsSent.iterator().next().getValue();
+ Assertions.assertEquals(1, cmd.getTargetDatanodes().size());
+ }
+
+ @Test
+ public void testPartialDecommissionIfNotEnoughNodes() {
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(IN_SERVICE, 1),
+ Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+ Pair.of(DECOMMISSIONING, 4), Pair.of(DECOMMISSIONING, 5));
+ PlacementPolicy placementPolicy = ReplicationTestUtil
+ .getInsufficientNodesTestPlacementPolicy(nodeManager, conf, 2);
+ ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
+ placementPolicy, conf, replicationManager);
+
+ ContainerHealthResult.UnderReplicatedHealthResult underRep =
+ new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ 0, true, false, false);
+
+ assertThrows(InsufficientDatanodesException.class, () ->
+ ecURH.processAndSendCommands(availableReplicas, Collections.emptyList(),
+ underRep, 1));
+ Assertions.assertEquals(1, commandsSent.size());
+ SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
+ Assertions.assertEquals(
+ SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
+ }
+
+ @Test
+ public void testPartialMaintenanceIfNotEnoughNodes() {
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(IN_SERVICE, 1),
+ Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
+ Pair.of(ENTERING_MAINTENANCE, 4),
+ Pair.of(ENTERING_MAINTENANCE, 5));
+ PlacementPolicy placementPolicy = ReplicationTestUtil
+ .getInsufficientNodesTestPlacementPolicy(nodeManager, conf, 2);
+ ECUnderReplicationHandler ecURH = new ECUnderReplicationHandler(
+ placementPolicy, conf, replicationManager);
+
+ ContainerHealthResult.UnderReplicatedHealthResult underRep =
+ new ContainerHealthResult.UnderReplicatedHealthResult(container,
+ 0, false, false, false);
+
+ assertThrows(InsufficientDatanodesException.class, () ->
+ ecURH.processAndSendCommands(availableReplicas, Collections.emptyList(),
+ underRep, 2));
+ Assertions.assertEquals(1, commandsSent.size());
+ SCMCommand<?> cmd = commandsSent.iterator().next().getValue();
+ Assertions.assertEquals(
+ SCMCommandProto.Type.replicateContainerCommand, cmd.getType());
+ }
+
@Test
public void testUnderRepWithDecommissionAndNotEnoughNodes()
throws IOException {
@@ -511,18 +585,20 @@ public class TestECUnderReplicationHandler {
availableReplicas.add(toAdd);
}
- ecURH.processAndSendCommands(availableReplicas, Collections.emptyList(),
- underRep, 2);
+ assertThrows(SCMException.class,
+ () -> ecURH.processAndSendCommands(availableReplicas,
+ Collections.emptyList(), underRep, 2));
- Mockito.verify(replicationManager, times(0))
+ Mockito.verify(replicationManager, times(1))
.processOverReplicatedContainer(underRep);
Assertions.assertEquals(1, commandsSent.size());
Pair<DatanodeDetails, SCMCommand<?>> pair =
commandsSent.iterator().next();
Assertions.assertEquals(newNode, pair.getKey());
- Assertions.assertEquals(StorageContainerDatanodeProtocolProtos
- .SCMCommandProto.Type.reconstructECContainersCommand,
+ Assertions.assertEquals(
+ SCMCommandProto.Type.reconstructECContainersCommand,
pair.getValue().getType());
+ Mockito.clearInvocations(replicationManager);
commandsSent.clear();
}
}
@@ -552,9 +628,8 @@ public class TestECUnderReplicationHandler {
() -> ecURH.processAndSendCommands(availableReplicas,
Collections.emptyList(), underRep, 1));
- // Now adjust replicas so it is also over replicated. This time rather than
- // throwing it should call the OverRepHandler and return whatever it
- // returns, which in this case is a delete command for replica index 4.
+ // Now adjust replicas so it is also over replicated. This time it should
+ // call the OverRepHandler and then throw
ContainerReplica overRepReplica =
ReplicationTestUtil.createContainerReplica(container.containerID(),
4, IN_SERVICE, CLOSED);
@@ -569,8 +644,8 @@ public class TestECUnderReplicationHandler {
commandsSent.addAll(expectedDelete);
return expectedDelete.size();
});
- ecURH.processAndSendCommands(availableReplicas,
- Collections.emptyList(), underRep, 1);
+ assertThrows(SCMException.class, () -> ecURH.processAndSendCommands(
+ availableReplicas, Collections.emptyList(), underRep, 1));
Mockito.verify(replicationManager, times(1))
.processOverReplicatedContainer(underRep);
Assertions.assertEquals(true, expectedDelete.equals(commandsSent));
@@ -594,16 +669,17 @@ public class TestECUnderReplicationHandler {
commandsSent.clear();
// Now add a decommissioning index - we will not get a replicate command
- // for it, as the placement policy will throw an exception as we catch
- // and just return the first reconstruction command. This will not goto
- // the over-rep handler as we have a command already created for the under
- // replication, even if the container is over replicated too.
+ // for it, as the placement policy will throw an exception which will
+ // come up the stack and be thrown out to indicate this container must be
+ // retried.
Set<ContainerReplica> replicas = ReplicationTestUtil
.createReplicas(Pair.of(DECOMMISSIONING, 1),
Pair.of(IN_SERVICE, 2), Pair.of(IN_SERVICE, 3),
Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 4));
- testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
- replicas, 0, 0, sameNodePolicy);
+
+ assertThrows(SCMException.class, () ->
+ testUnderReplicationWithMissingIndexes(ImmutableList.of(5), replicas,
+ 0, 0, sameNodePolicy));
}
@Test
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org