You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by um...@apache.org on 2022/07/18 23:06:11 UTC

[ozone] branch master updated: HDDS-6895. EC: ReplicationManager - Logic to process the under replicated queue and assign work to DNs (#3599)

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9ebb4aa534 HDDS-6895. EC: ReplicationManager - Logic to process the under replicated queue and assign work to DNs (#3599)
9ebb4aa534 is described below

commit 9ebb4aa534ec2971e835281299da57b9afbb43ea
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Tue Jul 19 00:06:05 2022 +0100

    HDDS-6895. EC: ReplicationManager - Logic to process the under replicated queue and assign work to DNs (#3599)
---
 .../commands/ReconstructECContainersCommand.java   |  15 +-
 .../commands/ReplicateContainerCommand.java        |  20 ++-
 .../states/endpoint/TestHeartbeatEndpointTask.java |   8 +-
 .../TestReconstructionECContainersCommands.java    |  16 +-
 .../proto/ScmServerDatanodeHeartbeatProtocol.proto |   1 +
 .../replication/ECUnderReplicationHandler.java     |  11 +-
 .../container/replication/ReplicationManager.java  |  36 +++++
 .../replication/UnderReplicatedProcessor.java      | 125 ++++++++++++++
 .../replication/UnhealthyReplicationHandler.java   |   9 +-
 .../hdds/scm/server/StorageContainerManager.java   |  16 ++
 .../container/replication/ReplicationTestUtil.java |  21 +++
 .../replication/TestECUnderReplicationHandler.java |  63 +++++---
 .../replication/TestUnderReplicatedProcessor.java  | 179 +++++++++++++++++++++
 13 files changed, 485 insertions(+), 35 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconstructECContainersCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconstructECContainersCommand.java
index bccb82a322..203e5e6bed 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconstructECContainersCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReconstructECContainersCommand.java
@@ -23,6 +23,7 @@ import java.util.Objects;
 import java.util.stream.Collectors;
 
 import com.google.protobuf.ByteString;
+import org.apache.hadoop.hdds.HddsIdFactory;
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
@@ -47,16 +48,10 @@ public class ReconstructECContainersCommand
       List<DatanodeDetailsAndReplicaIndex> sources,
       List<DatanodeDetails> targetDatanodes, byte[] missingContainerIndexes,
       ECReplicationConfig ecReplicationConfig) {
-    super();
-    this.containerID = containerID;
-    this.sources = sources;
-    this.targetDatanodes = targetDatanodes;
-    this.missingContainerIndexes =
-        Arrays.copyOf(missingContainerIndexes, missingContainerIndexes.length);
-    this.ecReplicationConfig = ecReplicationConfig;
+    this(containerID, sources, targetDatanodes, missingContainerIndexes,
+        ecReplicationConfig, HddsIdFactory.getLongId());
   }
 
-  // Should be called only for protobuf conversion
   public ReconstructECContainersCommand(long containerID,
       List<DatanodeDetailsAndReplicaIndex> sourceDatanodes,
       List<DatanodeDetails> targetDatanodes, byte[] missingContainerIndexes,
@@ -68,6 +63,10 @@ public class ReconstructECContainersCommand
     this.missingContainerIndexes =
         Arrays.copyOf(missingContainerIndexes, missingContainerIndexes.length);
     this.ecReplicationConfig = ecReplicationConfig;
+    if (targetDatanodes.size() != missingContainerIndexes.length) {
+      throw new IllegalArgumentException("Number of target datanodes and " +
+          "container indexes should be same");
+    }
   }
 
   @Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
index e663bed794..f824557778 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
@@ -41,6 +41,7 @@ public class ReplicateContainerCommand
 
   private final long containerID;
   private final List<DatanodeDetails> sourceDatanodes;
+  private int replicaIndex = 0;
 
   public ReplicateContainerCommand(long containerID,
       List<DatanodeDetails> sourceDatanodes) {
@@ -57,6 +58,10 @@ public class ReplicateContainerCommand
     this.sourceDatanodes = sourceDatanodes;
   }
 
+  public void setReplicaIndex(int index) {
+    replicaIndex = index;
+  }
+
   @Override
   public Type getType() {
     return SCMCommandProto.Type.replicateContainerCommand;
@@ -70,6 +75,7 @@ public class ReplicateContainerCommand
     for (DatanodeDetails dd : sourceDatanodes) {
       builder.addSources(dd.getProtoBufMessage());
     }
+    builder.setReplicaIndex(replicaIndex);
     return builder.build();
   }
 
@@ -83,9 +89,13 @@ public class ReplicateContainerCommand
             .map(DatanodeDetails::getFromProtoBuf)
             .collect(Collectors.toList());
 
-    return new ReplicateContainerCommand(protoMessage.getContainerID(),
-        datanodeDetails, protoMessage.getCmdId());
-
+    ReplicateContainerCommand cmd =
+        new ReplicateContainerCommand(protoMessage.getContainerID(),
+            datanodeDetails, protoMessage.getCmdId());
+    if (protoMessage.hasReplicaIndex()) {
+      cmd.setReplicaIndex(protoMessage.getReplicaIndex());
+    }
+    return cmd;
   }
 
   public long getContainerID() {
@@ -95,4 +105,8 @@ public class ReplicateContainerCommand
   public List<DatanodeDetails> getSourceDatanodes() {
     return sourceDatanodes;
   }
+
+  public int getReplicaIndex() {
+    return replicaIndex;
+  }
 }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 2daa985635..4e472d9865 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -24,7 +24,9 @@ import static org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutV
 import static org.mockito.ArgumentMatchers.any;
 
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -33,6 +35,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandQueueReportProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
@@ -68,8 +71,11 @@ public class TestHeartbeatEndpointTask {
         Mockito.mock(
             StorageContainerDatanodeProtocolClientSideTranslatorPB.class);
 
+    List<DatanodeDetails> targetDns = new ArrayList<>();
+    targetDns.add(MockDatanodeDetails.randomDatanodeDetails());
+    targetDns.add(MockDatanodeDetails.randomDatanodeDetails());
     ReconstructECContainersCommand cmd = new ReconstructECContainersCommand(
-        1, emptyList(), emptyList(), new byte[]{2, 5},
+        1, emptyList(), targetDns, new byte[]{2, 5},
         new ECReplicationConfig(3, 2));
 
     Mockito.when(scm.sendHeartbeat(any()))
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/protocol/commands/TestReconstructionECContainersCommands.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/protocol/commands/TestReconstructionECContainersCommands.java
index 5b8cac8665..5f1601d8f0 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/protocol/commands/TestReconstructionECContainersCommands.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/protocol/commands/TestReconstructionECContainersCommands.java
@@ -25,6 +25,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
@@ -33,6 +34,19 @@ import java.util.stream.Collectors;
  */
 public class TestReconstructionECContainersCommands {
 
+  @Test
+  public void testExceptionIfSourceAndMissingNotSameLength() {
+    ECReplicationConfig ecReplicationConfig = new ECReplicationConfig(3, 2);
+    byte[] missingContainerIndexes = {1, 2};
+
+    List<DatanodeDetails> targetDns = new ArrayList<>();
+    targetDns.add(MockDatanodeDetails.randomDatanodeDetails());
+
+    Assert.assertThrows(IllegalArgumentException.class,
+        () -> new ReconstructECContainersCommand(1L, Collections.emptyList(),
+        targetDns, missingContainerIndexes, ecReplicationConfig));
+  }
+
   @Test
   public void protobufConversion() {
     byte[] missingContainerIndexes = {1, 2};
@@ -48,7 +62,7 @@ public class TestReconstructionECContainersCommands {
           a -> new ReconstructECContainersCommand
               .DatanodeDetailsAndReplicaIndex(a, dnDetails.indexOf(a)))
         .collect(Collectors.toList());
-    List<DatanodeDetails> targets = getDNDetails(5);
+    List<DatanodeDetails> targets = getDNDetails(2);
     ReconstructECContainersCommand reconstructECContainersCommand =
         new ReconstructECContainersCommand(1L, sources, targets,
             missingContainerIndexes, ecReplicationConfig);
diff --git a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index e998b846df..602805d3f3 100644
--- a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -409,6 +409,7 @@ message ReplicateContainerCommandProto {
   required int64 containerID = 1;
   repeated DatanodeDetailsProto sources = 2;
   required int64 cmdId = 3;
+  optional int32 replicaIndex = 4;
 }
 
 /**
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
index 0f6806e68b..f7a5fd73a8 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java
@@ -81,14 +81,17 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
    * @param remainingMaintenanceRedundancy - represents that how many nodes go
    *                                      into maintenance.
    * @return Returns the key value pair of destination dn where the command gets
-   * executed and the command itself.
+   * executed and the command itself. If an empty list is returned, it indicates
+   * the container is no longer unhealthy and can be removed from the unhealthy
+   * queue. Any exception indicates that the container is still unhealthy and
+   * should be retried later.
    */
   @Override
   public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
       final Set<ContainerReplica> replicas,
       final List<ContainerReplicaOp> pendingOps,
       final ContainerHealthResult result,
-      final int remainingMaintenanceRedundancy) {
+      final int remainingMaintenanceRedundancy) throws IOException {
     ContainerInfo container = result.getContainerInfo();
     ECReplicationConfig repConfig =
         (ECReplicationConfig) container.getReplicationConfig();
@@ -199,6 +202,9 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
             final ReplicateContainerCommand replicateCommand =
                 new ReplicateContainerCommand(id.getProtobuf().getId(),
                     ImmutableList.of(decommissioningSrcNode));
+            // For EC containers, we need to track the replica index which is
+            // to be replicated, so add it to the command.
+            replicateCommand.setReplicaIndex(replica.getReplicaIndex());
             DatanodeDetails target = iterator.next();
             commands.put(target, replicateCommand);
           }
@@ -208,6 +214,7 @@ public class ECUnderReplicationHandler implements UnhealthyReplicationHandler {
       LOG.warn("Exception while processing for creating the EC reconstruction" +
               " container commands for {}.",
           id, ex);
+      throw ex;
     }
     return commands;
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 7464cc9605..99a6a8db90 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -40,7 +40,9 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.node.NodeStatus;
 import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.ExitUtil;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -137,6 +139,7 @@ public class ReplicationManager implements SCMService {
   private final ReentrantLock lock = new ReentrantLock();
   private Queue<ContainerHealthResult.UnderReplicatedHealthResult>
       underRepQueue;
+  private final ECUnderReplicationHandler ecUnderReplicationHandler;
 
   /**
    * Constructs ReplicationManager instance with the given configuration.
@@ -174,6 +177,8 @@ public class ReplicationManager implements SCMService {
     this.ecContainerHealthCheck = new ECContainerHealthCheck();
     this.nodeManager = nodeManager;
     this.underRepQueue = createUnderReplicatedQueue();
+    ecUnderReplicationHandler = new ECUnderReplicationHandler(
+        containerPlacement, conf, nodeManager);
     start();
   }
 
@@ -315,6 +320,21 @@ public class ReplicationManager implements SCMService {
     }
   }
 
+  public Map<DatanodeDetails, SCMCommand<?>> processUnderReplicatedContainer(
+      final ContainerHealthResult result) throws IOException {
+    ContainerID containerID = result.getContainerInfo().containerID();
+    Set<ContainerReplica> replicas = containerManager.getContainerReplicas(
+        containerID);
+    List<ContainerReplicaOp> pendingOps =
+        containerReplicaPendingOps.getPendingOps(containerID);
+    return ecUnderReplicationHandler.processAndCreateCommands(replicas,
+        pendingOps, result, 0);
+  }
+
+  public long getScmTerm() throws NotLeaderException {
+    return scmContext.getTermOfLeader();
+  }
+
   protected ContainerHealthResult processContainer(ContainerInfo containerInfo,
       Queue<ContainerHealthResult.UnderReplicatedHealthResult> underRep,
       List<ContainerHealthResult.OverReplicatedHealthResult> overRep,
@@ -431,6 +451,18 @@ public class ReplicationManager implements SCMService {
     )
     private long interval = Duration.ofSeconds(300).toMillis();
 
+    /**
+     * The frequency in which the Under Replicated queue is processed.
+     */
+    @Config(key = "under.replicated.interval",
+        type = ConfigType.TIME,
+        defaultValue = "30s",
+        tags = {SCM, OZONE},
+        description = "How frequently to check if there are work to process " +
+            " on the under replicated queue"
+    )
+    private long underReplicatedInterval = Duration.ofSeconds(30).toMillis();
+
     /**
      * Timeout for container replication & deletion command issued by
      * ReplicationManager.
@@ -474,6 +506,10 @@ public class ReplicationManager implements SCMService {
       return interval;
     }
 
+    public long getUnderReplicatedInterval() {
+      return underReplicatedInterval;
+    }
+
     public long getEventTimeout() {
       return eventTimeout;
     }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
new file mode 100644
index 0000000000..f11356bb6f
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicatedProcessor.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class used to pick messages from the ReplicationManager under replicated
+ * queue, calculate the reconstruction commands and assign to the datanodes
+ * via the eventQueue.
+ */
+public class UnderReplicatedProcessor {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(UnderReplicatedProcessor.class);
+  private final ReplicationManager replicationManager;
+  private final ContainerReplicaPendingOps pendingOps;
+  private final EventPublisher eventPublisher;
+
+  public UnderReplicatedProcessor(ReplicationManager replicationManager,
+      ContainerReplicaPendingOps pendingOps,
+      EventPublisher eventPublisher) {
+    this.replicationManager = replicationManager;
+    this.pendingOps = pendingOps;
+    this.eventPublisher = eventPublisher;
+  }
+
+  /**
+   * Read messages from the ReplicationManager under replicated queue and,
+   * form commands to correct the under replication. The commands are added
+   * to the event queue and the PendingReplicaOps are adjusted.
+   *
+   * Note: this is a temporary implementation of this feature. A future
+   * version will need to limit the amount of messages assigned to each
+   * datanode, so they are not assigned too much work.
+   */
+  public void processAll() {
+    int processed = 0;
+    int failed = 0;
+    while (true) {
+      ContainerHealthResult.UnderReplicatedHealthResult underRep =
+          replicationManager.dequeueUnderReplicatedContainer();
+      if (underRep == null) {
+        break;
+      }
+      try {
+        processContainer(underRep);
+        processed++;
+      } catch (IOException e) {
+        LOG.error("Error processing under replicated container {}",
+            underRep.getContainerInfo(), e);
+        failed++;
+        replicationManager.requeueUnderReplicatedContainer(underRep);
+      }
+    }
+    LOG.info("Processed {} under replicated containers, failed processing {}",
+        processed, failed);
+  }
+
+  protected void processContainer(ContainerHealthResult
+      .UnderReplicatedHealthResult underRep) throws IOException {
+    Map<DatanodeDetails, SCMCommand<?>> cmds = replicationManager
+        .processUnderReplicatedContainer(underRep);
+    for (Map.Entry<DatanodeDetails, SCMCommand<?>> cmd : cmds.entrySet()) {
+      SCMCommand<?> scmCmd = cmd.getValue();
+      scmCmd.setTerm(replicationManager.getScmTerm());
+      final CommandForDatanode<?> datanodeCommand =
+          new CommandForDatanode<>(cmd.getKey().getUuid(), scmCmd);
+      eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+      adjustPendingOps(underRep.getContainerInfo().containerID(),
+          scmCmd, cmd.getKey());
+    }
+  }
+
+  private void adjustPendingOps(ContainerID containerID, SCMCommand<?> cmd,
+      DatanodeDetails targetDatanode)
+      throws IOException {
+    if (cmd.getType() == StorageContainerDatanodeProtocolProtos
+        .SCMCommandProto.Type.reconstructECContainersCommand) {
+      ReconstructECContainersCommand rcc = (ReconstructECContainersCommand) cmd;
+      List<DatanodeDetails> targets = rcc.getTargetDatanodes();
+      byte[] targetIndexes = rcc.getMissingContainerIndexes();
+      for (int i = 0; i < targetIndexes.length; i++) {
+        pendingOps.scheduleAddReplica(containerID, targets.get(i),
+            targetIndexes[i]);
+      }
+    } else if (cmd.getType() == StorageContainerDatanodeProtocolProtos
+        .SCMCommandProto.Type.replicateContainerCommand) {
+      ReplicateContainerCommand rcc = (ReplicateContainerCommand) cmd;
+      pendingOps.scheduleAddReplica(
+          containerID, targetDatanode, rcc.getReplicaIndex());
+    } else {
+      throw new IOException("Unexpected command type " + cmd.getType());
+    }
+  }
+}
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
index cf27fcb51c..a6d37c40f6 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnhealthyReplicationHandler.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 
+import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -41,9 +42,13 @@ public interface UnhealthyReplicationHandler {
    * @param remainingMaintenanceRedundancy - represents that how many nodes go
    *                                      into maintenance.
    * @return Returns the key value pair of destination dn where the command gets
-   * executed and the command itself.
+   * executed and the command itself. If an empty list is returned, it indicates
+   * the container is no longer unhealthy and can be removed from the unhealthy
+   * queue. Any exception indicates that the container is still unhealthy and
+   * should be retried later.
    */
   Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands(
       Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps,
-      ContainerHealthResult result, int remainingMaintenanceRedundancy);
+      ContainerHealthResult result, int remainingMaintenanceRedundancy)
+      throws IOException;
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 3da1d74c81..c0a1afc90f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerManagerImpl;
 import org.apache.hadoop.hdds.scm.container.ContainerManager;
 import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
 import org.apache.hadoop.hdds.scm.container.replication.LegacyReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.UnderReplicatedProcessor;
 import org.apache.hadoop.hdds.scm.crl.CRLStatusReportHandler;
 import org.apache.hadoop.hdds.scm.ha.BackgroundSCMService;
 import org.apache.hadoop.hdds.scm.ha.HASecurityUtils;
@@ -723,6 +724,21 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
           clock,
           legacyRM,
           containerReplicaPendingOps);
+      ReplicationManager.ReplicationManagerConfiguration rmConf = conf
+          .getObject(ReplicationManager.ReplicationManagerConfiguration.class);
+
+      UnderReplicatedProcessor underReplicatedProcessor =
+          new UnderReplicatedProcessor(replicationManager,
+              containerReplicaPendingOps, eventQueue);
+
+      BackgroundSCMService underReplicatedQueueThread =
+          new BackgroundSCMService.Builder().setClock(clock)
+              .setScmContext(scmContext)
+              .setServiceName("UnderReplicatedQueueThread")
+              .setIntervalInMillis(rmConf.getUnderReplicatedInterval())
+              .setWaitTimeInMillis(backgroundServiceSafemodeWaitMs)
+              .setPeriodicalTask(underReplicatedProcessor::processAll).build();
+      serviceManager.register(underReplicatedQueueThread);
     }
     serviceManager.register(replicationManager);
     if (configurator.getScmSafeModeManager() != null) {
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index c220223805..97ca424843 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -39,6 +39,7 @@ import java.util.List;
 import java.util.Set;
 
 import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE;
 
 /**
  * Helper class to provide common methods used to test ReplicationManager.
@@ -142,4 +143,24 @@ public final class ReplicationTestUtil {
       }
     };
   }
+
+  public static PlacementPolicy getNoNodesTestPlacementPolicy(
+      final NodeManager nodeManager, final OzoneConfiguration conf) {
+    return new SCMCommonPlacementPolicy(nodeManager, conf) {
+      @Override
+      public List<DatanodeDetails> chooseDatanodes(
+          List<DatanodeDetails> excludedNodes,
+          List<DatanodeDetails> favoredNodes, int nodesRequiredToChoose,
+          long metadataSizeRequired, long dataSizeRequired)
+          throws SCMException {
+        throw new SCMException("No nodes available",
+            FAILED_TO_FIND_SUITABLE_NODE);
+      }
+
+      @Override
+      public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) {
+        return null;
+      }
+    };
+  }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
index 014306af27..2a5b18b8cf 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestECUnderReplicationHandler.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.ContainerReplica;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.net.NodeSchema;
 import org.apache.hadoop.hdds.scm.net.NodeSchemaManager;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
@@ -42,6 +43,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mockito;
 
+import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -52,6 +54,7 @@ import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalSt
 import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA;
 import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA;
+import static org.junit.Assert.assertThrows;
 
 /**
  * Tests the ECUnderReplicationHandling functionality.
@@ -84,73 +87,96 @@ public class TestECUnderReplicationHandler {
   }
 
   @Test
-  public void testUnderReplicationWithMissingParityIndex5() {
+  public void testUnderReplicationWithMissingParityIndex5() throws IOException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
             Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4));
     testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
-        availableReplicas, 0);
+        availableReplicas, 0, policy);
   }
 
   @Test
-  public void testUnderReplicationWithMissingIndex34() {
+  public void testUnderReplicationWithMissingIndex34() throws IOException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2),
             Pair.of(IN_SERVICE, 5));
     testUnderReplicationWithMissingIndexes(ImmutableList.of(3, 4),
-        availableReplicas, 0);
+        availableReplicas, 0, policy);
   }
 
   @Test
-  public void testUnderReplicationWithMissingIndex2345() {
+  public void testUnderReplicationWithMissingIndex2345() throws IOException {
     Set<ContainerReplica> availableReplicas =
         ReplicationTestUtil.createReplicas(Pair.of(IN_SERVICE, 1));
     testUnderReplicationWithMissingIndexes(ImmutableList.of(2, 3, 4, 5),
-        availableReplicas, 0);
+        availableReplicas, 0, policy);
   }
 
   @Test
-  public void testUnderReplicationWithMissingIndex12345() {
+  public void testUnderReplicationWithMissingIndex12345() throws IOException {
     Set<ContainerReplica> availableReplicas = new HashSet<>();
     testUnderReplicationWithMissingIndexes(ImmutableList.of(1, 2, 3, 4, 5),
-        availableReplicas, 0);
+        availableReplicas, 0, policy);
   }
 
   @Test
-  public void testUnderReplicationWithDecomIndex1() {
+  public void testUnderReplicationWithDecomIndex1() throws IOException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 2),
             Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4),
             Pair.of(IN_SERVICE, 5));
-    testUnderReplicationWithMissingIndexes(Lists.emptyList(), availableReplicas,
-        1);
+    Map<DatanodeDetails, SCMCommand<?>> cmds =
+        testUnderReplicationWithMissingIndexes(
+            Lists.emptyList(), availableReplicas, 1, policy);
+    Assert.assertEquals(1, cmds.size());
+    // Check the replicate command has index 1 set
+    ReplicateContainerCommand cmd = (ReplicateContainerCommand) cmds.values()
+        .iterator().next();
+    Assert.assertEquals(1, cmd.getReplicaIndex());
+
   }
 
   @Test
-  public void testUnderReplicationWithDecomIndex12() {
+  public void testUnderReplicationWithDecomIndex12() throws IOException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(DECOMMISSIONING, 1),
             Pair.of(DECOMMISSIONING, 2), Pair.of(IN_SERVICE, 3),
             Pair.of(IN_SERVICE, 4), Pair.of(IN_SERVICE, 5));
     testUnderReplicationWithMissingIndexes(Lists.emptyList(), availableReplicas,
-        2);
+        2, policy);
   }
 
   @Test
-  public void testUnderReplicationWithMixedDecomAndMissingIndexes() {
+  public void testUnderReplicationWithMixedDecomAndMissingIndexes()
+      throws IOException {
     Set<ContainerReplica> availableReplicas = ReplicationTestUtil
         .createReplicas(Pair.of(DECOMMISSIONING, 1),
             Pair.of(DECOMMISSIONING, 2), Pair.of(IN_SERVICE, 3),
             Pair.of(IN_SERVICE, 4));
     testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
-        availableReplicas, 2);
+        availableReplicas, 2, policy);
+  }
+
+  @Test
+  public void testExceptionIfNoNodesFound() throws IOException {
+    PlacementPolicy noNodesPolicy = ReplicationTestUtil
+        .getNoNodesTestPlacementPolicy(nodeManager, conf);
+    Set<ContainerReplica> availableReplicas = ReplicationTestUtil
+        .createReplicas(Pair.of(DECOMMISSIONING, 1),
+            Pair.of(DECOMMISSIONING, 2), Pair.of(IN_SERVICE, 3),
+            Pair.of(IN_SERVICE, 4));
+    assertThrows(SCMException.class, () ->
+        testUnderReplicationWithMissingIndexes(ImmutableList.of(5),
+            availableReplicas, 2, noNodesPolicy));
+
   }
 
-  public void testUnderReplicationWithMissingIndexes(
+  public Map<DatanodeDetails, SCMCommand<?>>
+      testUnderReplicationWithMissingIndexes(
       List<Integer> missingIndexes, Set<ContainerReplica> availableReplicas,
-      int decomIndexes) {
+      int decomIndexes, PlacementPolicy placementPolicy) throws IOException {
     ECUnderReplicationHandler ecURH =
-        new ECUnderReplicationHandler(policy, conf, nodeManager);
+        new ECUnderReplicationHandler(placementPolicy, conf, nodeManager);
     ContainerHealthResult.UnderReplicatedHealthResult result =
         Mockito.mock(ContainerHealthResult.UnderReplicatedHealthResult.class);
     Mockito.when(result.isUnrecoverable()).thenReturn(false);
@@ -186,5 +212,6 @@ public class TestECUnderReplicationHandler {
     Assert.assertEquals(decomIndexes, replicateCommand);
     Assert.assertEquals(shouldReconstructCommandExist ? 1 : 0,
         reconstructCommand);
+    return datanodeDetailsSCMCommandMap;
   }
 }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
new file mode 100644
index 0000000000..3f3cfeaa8b
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestUnderReplicatedProcessor.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
+import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.ozone.test.TestClock;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.eq;
+
+/**
+ * Tests for the UnderReplicatedProcessor class.
+ */
+public class TestUnderReplicatedProcessor {
+
+  private ConfigurationSource conf;
+  private TestClock clock;
+  private ContainerReplicaPendingOps pendingOps;
+  private ReplicationManager replicationManager;
+  private EventPublisher eventPublisher;
+  private ECReplicationConfig repConfig;
+  private UnderReplicatedProcessor underReplicatedProcessor;
+
+  @Before
+  public void setup() {
+    conf = new OzoneConfiguration();
+    clock = new TestClock(Instant.now(), ZoneId.systemDefault());
+    pendingOps = new ContainerReplicaPendingOps(conf, clock);
+    replicationManager = Mockito.mock(ReplicationManager.class);
+    eventPublisher = Mockito.mock(EventPublisher.class);
+    repConfig = new ECReplicationConfig(3, 2);
+    underReplicatedProcessor = new UnderReplicatedProcessor(
+        replicationManager, pendingOps, eventPublisher);
+  }
+
+  @Test
+  public void testEcReconstructionCommand() throws IOException {
+    ContainerInfo container = ReplicationTestUtil
+        .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
+    Mockito.when(replicationManager.dequeueUnderReplicatedContainer())
+        .thenReturn(new ContainerHealthResult
+                .UnderReplicatedHealthResult(container, 3, false, false, false),
+            null);
+    List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
+        sourceNodes = new ArrayList<>();
+    for (int i = 1; i <= 3; i++) {
+      sourceNodes.add(
+          new ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex(
+              MockDatanodeDetails.randomDatanodeDetails(), i));
+    }
+    List<DatanodeDetails> targetNodes = new ArrayList<>();
+    targetNodes.add(MockDatanodeDetails.randomDatanodeDetails());
+    targetNodes.add(MockDatanodeDetails.randomDatanodeDetails());
+    byte[] missingIndexes = {4, 5};
+
+    Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+    commands.put(MockDatanodeDetails.randomDatanodeDetails(),
+        new ReconstructECContainersCommand(container.getContainerID(),
+            sourceNodes, targetNodes, missingIndexes, repConfig));
+
+    Mockito.when(replicationManager
+            .processUnderReplicatedContainer(Mockito.any()))
+        .thenReturn(commands);
+    underReplicatedProcessor.processAll();
+
+    Mockito.verify(eventPublisher, Mockito.times(1))
+        .fireEvent(eq(SCMEvents.DATANODE_COMMAND), Mockito.any());
+    Mockito.verify(replicationManager, Mockito.times(0))
+        .requeueUnderReplicatedContainer(Mockito.any());
+
+    // Ensure pending ops is updated for the target DNs in the command and the
+    // correct indexes.
+    List<ContainerReplicaOp> ops = pendingOps
+        .getPendingOps(container.containerID());
+    Assert.assertEquals(2, ops.size());
+    for (ContainerReplicaOp op : ops) {
+      int ind = targetNodes.indexOf(op.getTarget());
+      Assert.assertEquals(missingIndexes[ind], op.getReplicaIndex());
+    }
+  }
+
+  @Test
+  public void testEcReplicationCommand() throws IOException {
+    ContainerInfo container = ReplicationTestUtil
+        .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
+    Mockito.when(replicationManager.dequeueUnderReplicatedContainer())
+        .thenReturn(new ContainerHealthResult
+                .UnderReplicatedHealthResult(container, 3, true, false, false),
+            null);
+    List<DatanodeDetails> sourceDns = new ArrayList<>();
+    sourceDns.add(MockDatanodeDetails.randomDatanodeDetails());
+    DatanodeDetails targetDn = MockDatanodeDetails.randomDatanodeDetails();
+    ReplicateContainerCommand rcc = new ReplicateContainerCommand(
+        container.getContainerID(), sourceDns);
+    rcc.setReplicaIndex(3);
+
+    Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
+    commands.put(targetDn, rcc);
+
+    Mockito.when(replicationManager
+            .processUnderReplicatedContainer(Mockito.any()))
+        .thenReturn(commands);
+    underReplicatedProcessor.processAll();
+
+    Mockito.verify(eventPublisher, Mockito.times(1))
+        .fireEvent(eq(SCMEvents.DATANODE_COMMAND), Mockito.any());
+    Mockito.verify(replicationManager, Mockito.times(0))
+        .requeueUnderReplicatedContainer(Mockito.any());
+
+    // Ensure pending ops is updated for the target DNs in the command and the
+    // correct indexes.
+    List<ContainerReplicaOp> ops = pendingOps
+        .getPendingOps(container.containerID());
+    Assert.assertEquals(1, ops.size());
+    Assert.assertEquals(3, ops.get(0).getReplicaIndex());
+  }
+
+  @Test
+  public void testMessageRequeuedOnException() throws IOException {
+    ContainerInfo container = ReplicationTestUtil
+        .createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig);
+    Mockito.when(replicationManager.dequeueUnderReplicatedContainer())
+        .thenReturn(new ContainerHealthResult
+                .UnderReplicatedHealthResult(container, 3, false, false, false),
+            null);
+
+    Mockito.when(replicationManager
+            .processUnderReplicatedContainer(Mockito.any()))
+        .thenThrow(new IOException("Test Exception"));
+    underReplicatedProcessor.processAll();
+
+    Mockito.verify(eventPublisher, Mockito.times(0))
+        .fireEvent(eq(SCMEvents.DATANODE_COMMAND), Mockito.any());
+    Mockito.verify(replicationManager, Mockito.times(1))
+        .requeueUnderReplicatedContainer(Mockito.any());
+
+    // Ensure pending ops has nothing for this container.
+    List<ContainerReplicaOp> ops = pendingOps
+        .getPendingOps(container.containerID());
+    Assert.assertEquals(0, ops.size());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org