You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/07/18 23:06:11 UTC
[ozone] branch master updated: HDDS-6895. EC: ReplicationManager - Logic to process the under replicated queue and assign work to DNs (#3599)
This is an automated email from the ASF dual-hosted git repository.
umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 9ebb4aa534 HDDS-6895. EC: ReplicationManager - Logic to process the under replicated queue and assign work to DNs (#3599)
9ebb4aa534 is described below
commit 9ebb4aa534ec2971e835281299da57b9afbb43ea
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Tue Jul 19 00:06:05 2022 +0100
HDDS-6895. EC: ReplicationManager - Logic to process the under replicated queue and assign work to DNs (#3599)
---
.../commands/ReconstructECContainersCommand.java | 15 +-
.../commands/ReplicateContainerCommand.java | 20 ++-
.../states/endpoint/TestHeartbeatEndpointTask.java | 8 +-
.../TestReconstructionECContainersCommands.java | 16 +-
.../proto/ScmServerDatanodeHeartbeatProtocol.proto | 1 +
.../replication/ECUnderReplicationHandler.java | 11 +-
.../container/replication/ReplicationManager.java | 36 +++++
.../replication/UnderReplicatedProcessor.java | 125 ++++++++++++++
.../replication/UnhealthyReplicationHandler.java | 9 +-
.../hdds/scm/server/StorageContainerManager.java | 16 ++
.../container/replication/ReplicationTestUtil.java | 21 +++
.../replication/TestECUnderReplicationHandler.java | 63 +++++---
.../replication/TestUnderReplicatedProcessor.java | 179 +++++++++++++++++++++
13 files changed, 485 insertions(+), 35 deletions(-)
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconstructECContainersCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconstructECContainersCommand.java
index bccb82a322..203e5e6bed 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconstructECContainersCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconstructECContainersCommand.java
@@ -23,6 +23,7 @@ import java.util.Objects;
import java.util.stream.Collectors;
import com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.HddsIdFactory;
import org.apache.hadoop.hdds.client.ECReplicationConfig;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
@@ -47,16 +48,10 @@ public class ReconstructECContainersCommand
List<DatanodeDetailsAndReplicaIndex> sources,
List<DatanodeDetails> targetDatanodes, byte[] missingContainerIndexes,
ECReplicationConfig ecReplicationConfig) {
- super();
- this.containerID = containerID;
- this.sources = sources;
- this.targetDatanodes = targetDatanodes;
- this.missingContainerIndexes =
- Arrays.copyOf(missingContainerIndexes, missingContainerIndexes.length);
- this.ecReplicationConfig = ecReplicationConfig;
+ this(containerID, sources, targetDatanodes, missingContainerIndexes,
+ ecReplicationConfig, HddsIdFactory.getLongId());
}
- // Should be called only for protobuf conversion
public ReconstructECContainersCommand(long containerID,
List<DatanodeDetailsAndReplicaIndex> sourceDatanodes,
List<DatanodeDetails> targetDatanodes, byte[] missingContainerIndexes,
@@ -68,6 +63,10 @@ public class ReconstructECContainersCommand
this.missingContainerIndexes =
Arrays.copyOf(missingContainerIndexes, missingContainerIndexes.length);
this.ecReplicationConfig = ecReplicationConfig;
+ if (targetDatanodes.size() != missingContainerIndexes.length) {
+ throw new IllegalArgumentException("Number of target datanodes and " +
+ "container indexes should be same");
+ }
}
@Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
index e663bed794..f824557778 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
@@ -41,6 +41,7 @@ public class ReplicateContainerCommand
private final long containerID;
private final List<DatanodeDetails> sourceDatanodes;
+ private int replicaIndex = 0;
public ReplicateContainerCommand(long containerID,
List<DatanodeDetails> sourceDatanodes) {
@@ -57,6 +58,10 @@ public class ReplicateContainerCommand
this.sourceDatanodes = sourceDatanodes;
}
+ public void setReplicaIndex(int index) {
+ replicaIndex = index;
+ }
+
@Override
public Type getType() {
return SCMCommandProto.Type.replicateContainerCommand;
@@ -70,6 +75,7 @@ public class ReplicateContainerCommand
for (DatanodeDetails dd : sourceDatanodes) {
builder.addSources(dd.getProtoBufMessage());
}
+ builder.setReplicaIndex(replicaIndex);
return builder.build();
}
@@ -83,9 +89,13 @@ public class ReplicateContainerCommand
.map(DatanodeDetails::getFromProtoBuf)
.collect(Collectors.toList());
- return new ReplicateContainerCommand(protoMessage.getContainerID(),
- datanodeDetails, protoMessage.getCmdId());
-
+ ReplicateContainerCommand cmd =
+ new ReplicateContainerCommand(protoMessage.getContainerID(),
+ datanodeDetails, protoMessage.getCmdId());
+ if (protoMessage.hasReplicaIndex()) {
+ cmd.setReplicaIndex(protoMessage.getReplicaIndex());
+ }
+ return cmd;
}
public long getContainerID() {
@@ -95,4 +105,8 @@ public class ReplicateContainerCommand
public List<DatanodeDetails> getSourceDatanodes() {
return sourceDatanodes;
}
+
+ public int getReplicaIndex() {
+ return replicaIndex;
+ }
}
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 2daa985635..4e472d9865 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -24,7 +24,9 @@ import static org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutV
import static org.mockito.ArgumentMatchers.any;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -33,6 +35,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandQueueReportProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
@@ -68,8 +71,11 @@ public class TestHeartbeatEndpointTask {
Mockito.mock(
StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
+ List<DatanodeDetails> targetDns = new ArrayList<>();
+ targetDns.add(MockDatanodeDetails.randomDatanodeDetails());
+ targetDns.add(MockDatanodeDetails.randomDatanodeDetails());
ReconstructECContainersCommand cmd = new ReconstructECContainersCommand(
- 1, emptyList(), emptyList(), new byte[]{2, 5},
+ 1, emptyList(), targetDns, new byte[]{2, 5},
new ECReplicationConfig(3, 2));
Mockito.when(scm.sendHeartbeat(any()))
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/protocol/commands/TestReconstructionECContainersCommands.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/protocol/commands/TestReconstructionECContainersCommands.java
index 5b8cac8665..5f1601d8f0 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/protocol/commands/TestReconstructionECContainersCommands.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/protocol/commands/TestReconstructionECContainersCommands.java
@@ -25,6 +25,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
@@ -33,6 +34,19 @@ import java.util.stream.Collectors;
*/
public class TestReconstructionECContainersCommands {
+ @Test
+ public void testExceptionIfSourceAndMissingNotSameLength() {
+ ECReplicationConfig ecReplicationConfig = new ECReplicationConfig(3, 2);
+ byte[] missingContainerIndexes = {1, 2};
+
+ List<DatanodeDetails> targetDns = new ArrayList<>();
+ targetDns.add(MockDatanodeDetails.randomDatanodeDetails());
+
+ Assert.assertThrows(IllegalArgumentException.class,
+ () -> new ReconstructECContainersCommand(1L, Collections.emptyList(),
+ targetDns, missingContainerIndexes, ecReplicationConfig));
+ }
+
@Test
public void protobufConversion() {
byte[] missingContainerIndexes = {1, 2};
@@ -48,7 +62,7 @@ public class TestReconstructionECContainersCommands {
a -> new ReconstructECContainersCommand
.DatanodeDetailsAndReplicaIndex(a, dnDetails.indexOf(a)))
.collect(Collectors.toList());
- List<DatanodeDetails> targets = getDNDetails(5);
+ List<DatanodeDetails> targets = getDNDetails(2);
ReconstructECContainersCommand reconstructECContainersCommand =
new ReconstructECContainersCommand(1L, sources, targets,
missingContainerIndexes, ecReplicationConfig);
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index e998b846df..602805d3f3 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -409,6 +409,7 @@ message ReplicateContainerCommandProto {
required int64 containerID = 1;
repeated DatanodeDetailsProto sources = 2;
required int64 cmdId = 3;
+ optional int32 replicaIndex = 4;
}
/**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 0f6806e68b..f7a5fd73a8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -81,14 +81,17 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
* @param remainingMaintenanceRedundancy - represents that how many nodes go
* into maintenance.
* @return Returns the key value pair of destination dn where the command gets
- * executed and the command itself.
+ * executed and the command itself. If an empty list is returned, it indicates
+ * the container is no longer unhealthy and can be removed from the unhealthy
+ * queue. Any exception indicates that the container is still unhealthy and
+ * should be retried later.
*/
@Override
public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
final Set<ContainerReplica> replicas,
final List<ContainerReplicaOp> pendingOps,
final ContainerHealthResult result,
- final int remainingMaintenanceRedundancy) {
+ final int remainingMaintenanceRedundancy) throws IOException {
ContainerInfo container = result.getContainerInfo();
ECReplicationConfig repConfig =
(ECReplicationConfig) container.getReplicationConfig();
@@ -199,6 +202,9 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
final ReplicateContainerCommand replicateCommand =
new ReplicateContainerCommand(id.getProtobuf().getId(),
ImmutableList.of(decommissioningSrcNode));
+ // For EC containers, we need to track the replica index which is
+ // to be replicated, so add it to the command.
+ replicateCommand.setReplicaIndex(replica.getReplicaIndex());
DatanodeDetails target = iterator.next();
commands.put(target, replicateCommand);
}
@@ -208,6 +214,7 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
LOG.warn("Exception while processing for creating the EC reconstruction" +
" container commands for {}.",
id, ex);
+ throw ex;
}
return commands;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 7464cc9605..99a6a8db90 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -40,7 +40,9 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.ExitUtil;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -137,6 +139,7 @@ public class ReplicationManager implements SCMService {
private final ReentrantLock lock = new ReentrantLock();
private Queue<ContainerHealthResult.UnderReplicatedHealthResult>
underRepQueue;
+ private final ECUnderReplicationHandler ecUnderReplicationHandler;
/**
* Constructs ReplicationManager instance with the given configuration.
@@ -174,6 +177,8 @@ public class ReplicationManager implements SCMService {
this.ecContainerHealthCheck = new ECContainerHealthCheck();
this.nodeManager = nodeManager;
this.underRepQueue = createUnderReplicatedQueue();
+ ecUnderReplicationHandler = new ECUnderReplicationHandler(
+ containerPlacement, conf, nodeManager);
start();
}
@@ -315,6 +320,21 @@ public class ReplicationManager implements SCMService {
}
}
+ public Map<DatanodeDetails, SCMCommand<?>> processUnderReplicatedContainer(
+ final ContainerHealthResult result) throws IOException {
+ ContainerID containerID = result.getContainerInfo().containerID();
+ Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
+ containerID);
+ List<ContainerReplicaOp> pendingOps =
+ containerReplicaPendingOps.getPendingOps(containerID);
+ return ecUnderReplicationHandler.processAndCreateCommands(replicas,
+ pendingOps, result, 0);
+ }
+
+ public long getScmTerm() throws NotLeaderException {
+ return scmContext.getTermOfLeader();
+ }
+
protected ContainerHealthResult processContainer(ContainerInfo containerInfo,
Queue<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
List<ContainerHealthResult.OverReplicatedHealthResult> overRep,
@@ -431,6 +451,18 @@ public class ReplicationManager implements SCMService {
)
private long interval = Duration.ofSeconds(300).toMillis();
+ /**
+ * The frequency in which the Under Replicated queue is processed.
+ */
+ @Config(key = "under.replicated.interval",
+ type = ConfigType.TIME,
+ defaultValue = "30s",
+ tags = {SCM, OZONE},
+ description = "How frequently to check if there are work to process " +
+ " on the under replicated queue"
+ )
+ private long underReplicatedInterval = Duration.ofSeconds(30).toMillis();
+
/**
* Timeout for container replication & deletion command issued by
* ReplicationManager.
@@ -474,6 +506,10 @@ public class ReplicationManager implements SCMService {
return interval;
}
+ public long getUnderReplicatedInterval() {
+ return underReplicatedInterval;
+ }
+
public long getEventTimeout() {
return eventTimeout;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
new file mode 100644
index 0000000000..f11356bb6f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class used to pick messages from the ReplicationManager under replicated
+ * queue, calculate the reconstruction commands and assign to the datanodes
+ * via the eventQueue.
+ */
+public class UnderReplicatedProcessor {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(UnderReplicatedProcessor.class);
+ private final ReplicationManager replicationManager;
+ private final ContainerReplicaPendingOps pendingOps;
+ private final EventPublisher eventPublisher;
+
+ public UnderReplicatedProcessor(ReplicationManager replicationManager,
+ ContainerReplicaPendingOps pendingOps,
+ EventPublisher eventPublisher) {
+ this.replicationManager = replicationManager;
+ this.pendingOps = pendingOps;
+ this.eventPublisher = eventPublisher;
+ }
+
+ /**
+ * Read messages from the ReplicationManager under replicated queue and,
+ * form commands to correct the under replication. The commands are added
+ * to the event queue and the PendingReplicaOps are adjusted.
+ *
+ * Note: this is a temporary implementation of this feature. A future
+ * version will need to limit the amount of messages assigned to each
+ * datanode, so they are not assigned too much work.
+ */
+ public void processAll() {
+ int processed = 0;
+ int failed = 0;
+ while (true) {
+ ContainerHealthResult.UnderReplicatedHealthResult underRep =
+ replicationManager.dequeueUnderReplicatedContainer();
+ if (underRep == null) {
+ break;
+ }
+ try {
+ processContainer(underRep);
+ processed++;
+ } catch (IOException e) {
+ LOG.error("Error processing under replicated container {}",
+ underRep.getContainerInfo(), e);
+ failed++;
+ replicationManager.requeueUnderReplicatedContainer(underRep);
+ }
+ }
+ LOG.info("Processed {} under replicated containers, failed processing {}",
+ processed, failed);
+ }
+
+ protected void processContainer(ContainerHealthResult
+ .UnderReplicatedHealthResult underRep) throws IOException {
+ Map<DatanodeDetails, SCMCommand<?>> cmds = replicationManager
+ .processUnderReplicatedContainer(underRep);
+ for (Map.Entry<DatanodeDetails, SCMCommand<?>> cmd : cmds.entrySet()) {
+ SCMCommand<?> scmCmd = cmd.getValue();
+ scmCmd.setTerm(replicationManager.getScmTerm());
+ final CommandForDatanode<?> datanodeCommand =
+ new CommandForDatanode<>(cmd.getKey().getUuid(), scmCmd);
+ eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+ adjustPendingOps(underRep.getContainerInfo().containerID(),
+ scmCmd, cmd.getKey());
+ }
+ }
+
+ private void adjustPendingOps(ContainerID containerID, SCMCommand<?> cmd,
+ DatanodeDetails targetDatanode)
+ throws IOException {
+ if (cmd.getType() == StorageContainerDatanodeProtocolProtos
+ .SCMCommandProto.Type.reconstructECContainersCommand) {
+ ReconstructECContainersCommand rcc = (ReconstructECContainersCommand) cmd;
+ List<DatanodeDetails> targets = rcc.getTargetDatanodes();
+ byte[] targetIndexes = rcc.getMissingContainerIndexes();
+ for (int i = 0; i < targetIndexes.length; i++) {
+ pendingOps.scheduleAddReplica(containerID, targets.get(i),
+ targetIndexes[i]);
+ }
+ } else if (cmd.getType() == StorageContainerDatanodeProtocolProtos
+ .SCMCommandProto.Type.replicateContainerCommand) {
+ ReplicateContainerCommand rcc = (ReplicateContainerCommand) cmd;
+ pendingOps.scheduleAddReplica(
+ containerID, targetDatanode, rcc.getReplicaIndex());
+ } else {
+ throw new IOException("Unexpected command type " + cmd.getType());
+ }
+ }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
index cf27fcb51c..a6d37c40f6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -41,9 +42,13 @@ public interface UnhealthyReplicationHandler {
* @param remainingMaintenanceRedundancy - represents that how many nodes go
* into maintenance.
* @return Returns the key value pair of destination dn where the command gets
- * executed and the command itself.
+ * executed and the command itself. If an empty list is returned, it indicates
+ * the container is no longer unhealthy and can be removed from the unhealthy
+ * queue. Any exception indicates that the container is still unhealthy and
+ * should be retried later.
*/
Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
- ContainerHealthResult result, int remainingMaintenanceRedundancy);
+ ContainerHealthResult result, int remainingMaintenanceRedundancy)
+ throws IOException;
}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 3da1d74c81..c0a1afc90f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.UnderReplicatedProcessor;
import org.apache.hadoop.hdds.scm.crl.CRLStatusReportHandler;
import org.apache.hadoop.hdds.scm.ha.BackgroundSCMService;
import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
@@ -723,6 +724,21 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
clock,
legacyRM,
containerReplicaPendingOps);
+ ReplicationManager.ReplicationManagerConfiguration rmConf = conf
+ .getObject(ReplicationManager.ReplicationManagerConfiguration.class);
+
+ UnderReplicatedProcessor underReplicatedProcessor =
+ new UnderReplicatedProcessor(replicationManager,
+ containerReplicaPendingOps, eventQueue);
+
+ BackgroundSCMService underReplicatedQueueThread =
+ new BackgroundSCMService.Builder().setClock(clock)
+ .setScmContext(scmContext)
+ .setServiceName("UnderReplicatedQueueThread")
+ .setIntervalInMillis(rmConf.getUnderReplicatedInterval())
+ .setWaitTimeInMillis(backgroundServiceSafemodeWaitMs)
+ .setPeriodicalTask(underReplicatedProcessor::processAll).build();
+ serviceManager.register(underReplicatedQueueThread);
}
serviceManager.register(replicationManager);
if (configurator.getScmSafeModeManager() != null) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index c220223805..97ca424843 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -39,6 +39,7 @@ import java.util.List;
import java.util.Set;
import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE;
/**
* Helper class to provide common methods used to test ReplicationManager.
@@ -142,4 +143,24 @@ public final class ReplicationTestUtil {
}
};
}
+
+ public static PlacementPolicy getNoNodesTestPlacementPolicy(
+ final NodeManager nodeManager, final OzoneConfiguration conf) {
+ return new SCMCommonPlacementPolicy(nodeManager, conf) {
+ @Override
+ public List<DatanodeDetails> chooseDatanodes(
+ List<DatanodeDetails> excludedNodes,
+ List<DatanodeDetails> favoredNodes, int nodesRequiredToChoose,
+ long metadataSizeRequired, long dataSizeRequired)
+ throws SCMException {
+ throw new SCMException("No nodes available",
+ FAILED_TO_FIND_SUITABLE_NODE);
+ }
+
+ @Override
+ public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
+ return null;
+ }
+ };
+ }
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 014306af27..2a5b18b8cf 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.net.NodeSchema;
import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -42,6 +43,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
+import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -52,6 +54,7 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalSt
import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
+import static org.junit.Assert.assertThrows;
/**
* Tests the ECUnderReplicationHandling functionality.
@@ -84,73 +87,96 @@ public class TestECUnderReplicationHandler {
}
@Test
- public void testUnderReplicationWithMissingParityIndex5() {
+ public void testUnderReplicationWithMissingParityIndex5() throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4));
testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
- availableReplicas, 0);
+ availableReplicas, 0, policy);
}
@Test
- public void testUnderReplicationWithMissingIndex34() {
+ public void testUnderReplicationWithMissingIndex34() throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
Pair.of(IN_SERVICE, 5));
testUnderReplicationWithMissingIndexes(ImmutableList.of(3, 4),
- availableReplicas, 0);
+ availableReplicas, 0, policy);
}
@Test
- public void testUnderReplicationWithMissingIndex2345() {
+ public void testUnderReplicationWithMissingIndex2345() throws IOException {
Set<ContainerReplica> availableReplicas =
ReplicationTestUtil.createReplicas(Pair.of(IN_SERVICE, 1));
testUnderReplicationWithMissingIndexes(ImmutableList.of(2, 3, 4, 5),
- availableReplicas, 0);
+ availableReplicas, 0, policy);
}
@Test
- public void testUnderReplicationWithMissingIndex12345() {
+ public void testUnderReplicationWithMissingIndex12345() throws IOException {
Set<ContainerReplica> availableReplicas = new HashSet<>();
testUnderReplicationWithMissingIndexes(ImmutableList.of(1, 2, 3, 4, 5),
- availableReplicas, 0);
+ availableReplicas, 0, policy);
}
@Test
- public void testUnderReplicationWithDecomIndex1() {
+ public void testUnderReplicationWithDecomIndex1() throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 2),
Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
Pair.of(IN_SERVICE, 5));
- testUnderReplicationWithMissingIndexes(Lists.emptyList(), availableReplicas,
- 1);
+ Map<DatanodeDetails, SCMCommand<?>> cmds =
+ testUnderReplicationWithMissingIndexes(
+ Lists.emptyList(), availableReplicas, 1, policy);
+ Assert.assertEquals(1, cmds.size());
+ // Check the replicate command has index 1 set
+ ReplicateContainerCommand cmd = (ReplicateContainerCommand) cmds.values()
+ .iterator().next();
+ Assert.assertEquals(1, cmd.getReplicaIndex());
+
}
@Test
- public void testUnderReplicationWithDecomIndex12() {
+ public void testUnderReplicationWithDecomIndex12() throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(DECOMMISSIONING, 1),
Pair.of(DECOMMISSIONING, 2), Pair.of(IN_SERVICE, 3),
Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5));
testUnderReplicationWithMissingIndexes(Lists.emptyList(), availableReplicas,
- 2);
+ 2, policy);
}
@Test
- public void testUnderReplicationWithMixedDecomAndMissingIndexes() {
+ public void testUnderReplicationWithMixedDecomAndMissingIndexes()
+ throws IOException {
Set<ContainerReplica> availableReplicas = ReplicationTestUtil
.createReplicas(Pair.of(DECOMMISSIONING, 1),
Pair.of(DECOMMISSIONING, 2), Pair.of(IN_SERVICE, 3),
Pair.of(IN_SERVICE, 4));
testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
- availableReplicas, 2);
+ availableReplicas, 2, policy);
+ }
+
+ @Test
+ public void testExceptionIfNoNodesFound() throws IOException {
+ PlacementPolicy noNodesPolicy = ReplicationTestUtil
+ .getNoNodesTestPlacementPolicy(nodeManager, conf);
+ Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+ .createReplicas(Pair.of(DECOMMISSIONING, 1),
+ Pair.of(DECOMMISSIONING, 2), Pair.of(IN_SERVICE, 3),
+ Pair.of(IN_SERVICE, 4));
+ assertThrows(SCMException.class, () ->
+ testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
+ availableReplicas, 2, noNodesPolicy));
+
}
- public void testUnderReplicationWithMissingIndexes(
+ public Map<DatanodeDetails, SCMCommand<?>>
+ testUnderReplicationWithMissingIndexes(
List<Integer> missingIndexes, Set<ContainerReplica> availableReplicas,
- int decomIndexes) {
+ int decomIndexes, PlacementPolicy placementPolicy) throws IOException {
ECUnderReplicationHandler ecURH =
- new ECUnderReplicationHandler(policy, conf, nodeManager);
+ new ECUnderReplicationHandler(placementPolicy, conf, nodeManager);
ContainerHealthResult.UnderReplicatedHealthResult result =
Mockito.mock(ContainerHealthResult.UnderReplicatedHealthResult.class);
Mockito.when(result.isUnrecoverable()).thenReturn(false);
@@ -186,5 +212,6 @@ public class TestECUnderReplicationHandler {
Assert.assertEquals(decomIndexes, replicateCommand);
Assert.assertEquals(shouldReconstructCommandExist ? 1 : 0,
reconstructCommand);
+ return datanodeDetailsSCMCommandMap;
}
}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
new file mode 100644
index 0000000000..3f3cfeaa8b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.ozone.test.TestClock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.eq;
+
+/**
+ * Tests for the UnderReplicatedProcessor class.
+ */
+public class TestUnderReplicatedProcessor {
+
+ private ConfigurationSource conf;
+ private TestClock clock;
+ private ContainerReplicaPendingOps pendingOps;
+ private ReplicationManager replicationManager;
+ private EventPublisher eventPublisher;
+ private ECReplicationConfig repConfig;
+ private UnderReplicatedProcessor underReplicatedProcessor;
+
+ @Before
+ public void setup() {
+ conf = new OzoneConfiguration();
+ clock = new TestClock(Instant.now(), ZoneId.systemDefault());
+ pendingOps = new ContainerReplicaPendingOps(conf, clock);
+ replicationManager = Mockito.mock(ReplicationManager.class);
+ eventPublisher = Mockito.mock(EventPublisher.class);
+ repConfig = new ECReplicationConfig(3, 2);
+ underReplicatedProcessor = new UnderReplicatedProcessor(
+ replicationManager, pendingOps, eventPublisher);
+ }
+
+ @Test
+ public void testEcReconstructionCommand() throws IOException {
+ ContainerInfo container = ReplicationTestUtil
+ .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
+ Mockito.when(replicationManager.dequeueUnderReplicatedContainer())
+ .thenReturn(new ContainerHealthResult
+ .UnderReplicatedHealthResult(container, 3, false, false, false),
+ null);
+ List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
+ sourceNodes = new ArrayList<>();
+ for (int i = 1; i <= 3; i++) {
+ sourceNodes.add(
+ new ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex(
+ MockDatanodeDetails.randomDatanodeDetails(), i));
+ }
+ List<DatanodeDetails> targetNodes = new ArrayList<>();
+ targetNodes.add(MockDatanodeDetails.randomDatanodeDetails());
+ targetNodes.add(MockDatanodeDetails.randomDatanodeDetails());
+ byte[] missingIndexes = {4, 5};
+
+ Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+ commands.put(MockDatanodeDetails.randomDatanodeDetails(),
+ new ReconstructECContainersCommand(container.getContainerID(),
+ sourceNodes, targetNodes, missingIndexes, repConfig));
+
+ Mockito.when(replicationManager
+ .processUnderReplicatedContainer(Mockito.any()))
+ .thenReturn(commands);
+ underReplicatedProcessor.processAll();
+
+ Mockito.verify(eventPublisher, Mockito.times(1))
+ .fireEvent(eq(SCMEvents.DATANODE_COMMAND), Mockito.any());
+ Mockito.verify(replicationManager, Mockito.times(0))
+ .requeueUnderReplicatedContainer(Mockito.any());
+
+ // Ensure pending ops is updated for the target DNs in the command and the
+ // correct indexes.
+ List<ContainerReplicaOp> ops = pendingOps
+ .getPendingOps(container.containerID());
+ Assert.assertEquals(2, ops.size());
+ for (ContainerReplicaOp op : ops) {
+ int ind = targetNodes.indexOf(op.getTarget());
+ Assert.assertEquals(missingIndexes[ind], op.getReplicaIndex());
+ }
+ }
+
+ @Test
+ public void testEcReplicationCommand() throws IOException {
+ ContainerInfo container = ReplicationTestUtil
+ .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
+ Mockito.when(replicationManager.dequeueUnderReplicatedContainer())
+ .thenReturn(new ContainerHealthResult
+ .UnderReplicatedHealthResult(container, 3, true, false, false),
+ null);
+ List<DatanodeDetails> sourceDns = new ArrayList<>();
+ sourceDns.add(MockDatanodeDetails.randomDatanodeDetails());
+ DatanodeDetails targetDn = MockDatanodeDetails.randomDatanodeDetails();
+ ReplicateContainerCommand rcc = new ReplicateContainerCommand(
+ container.getContainerID(), sourceDns);
+ rcc.setReplicaIndex(3);
+
+ Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+ commands.put(targetDn, rcc);
+
+ Mockito.when(replicationManager
+ .processUnderReplicatedContainer(Mockito.any()))
+ .thenReturn(commands);
+ underReplicatedProcessor.processAll();
+
+ Mockito.verify(eventPublisher, Mockito.times(1))
+ .fireEvent(eq(SCMEvents.DATANODE_COMMAND), Mockito.any());
+ Mockito.verify(replicationManager, Mockito.times(0))
+ .requeueUnderReplicatedContainer(Mockito.any());
+
+ // Ensure pending ops is updated for the target DNs in the command and the
+ // correct indexes.
+ List<ContainerReplicaOp> ops = pendingOps
+ .getPendingOps(container.containerID());
+ Assert.assertEquals(1, ops.size());
+ Assert.assertEquals(3, ops.get(0).getReplicaIndex());
+ }
+
+ @Test
+ public void testMessageRequeuedOnException() throws IOException {
+ ContainerInfo container = ReplicationTestUtil
+ .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
+ Mockito.when(replicationManager.dequeueUnderReplicatedContainer())
+ .thenReturn(new ContainerHealthResult
+ .UnderReplicatedHealthResult(container, 3, false, false, false),
+ null);
+
+ Mockito.when(replicationManager
+ .processUnderReplicatedContainer(Mockito.any()))
+ .thenThrow(new IOException("Test Exception"));
+ underReplicatedProcessor.processAll();
+
+ Mockito.verify(eventPublisher, Mockito.times(0))
+ .fireEvent(eq(SCMEvents.DATANODE_COMMAND), Mockito.any());
+ Mockito.verify(replicationManager, Mockito.times(1))
+ .requeueUnderReplicatedContainer(Mockito.any());
+
+ // Ensure pending ops has nothing for this container.
+ List<ContainerReplicaOp> ops = pendingOps
+ .getPendingOps(container.containerID());
+ Assert.assertEquals(0, ops.size());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org