You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/10/18 13:37:05 UTC

[ozone] branch master updated: HDDS-6893. EC: ReplicationManager - move the empty container handling into RM from Legacy (#3831)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 02c266d3fe HDDS-6893. EC: ReplicationManager - move the empty container handling into RM from Legacy (#3831)
02c266d3fe is described below

commit 02c266d3fef2fd873d9eca03c86b6910a91b6f52
Author: Siddhant Sangwan <si...@gmail.com>
AuthorDate: Tue Oct 18 19:06:59 2022 +0530

    HDDS-6893. EC: ReplicationManager - move the empty container handling into RM from Legacy (#3831)
---
 .../protocol/commands/DeleteContainerCommand.java  |   6 +
 .../container/replication/ReplicationManager.java  |  35 +++-
 .../replication/health/EmptyContainerHandler.java  | 141 +++++++++++++
 .../container/replication/ReplicationTestUtil.java |  39 +++-
 .../health/TestEmptyContainerHandler.java          | 233 +++++++++++++++++++++
 5 files changed, 448 insertions(+), 6 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteContainerCommand.java
index 8809d0c22a..bb9035f296 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteContainerCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteContainerCommand.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeleteContainerCommandProto;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
 
 /**
  * SCM command which tells the datanode to delete a container.
@@ -56,6 +57,11 @@ public class DeleteContainerCommand extends
     this.force = forceFlag;
   }
 
+  public DeleteContainerCommand(ContainerID containerID, boolean forceFlag) {
+    this.containerId = containerID.getId();
+    this.force = forceFlag;
+  }
+
   public void setReplicaIndex(int index) {
     replicaIndex = index;
   }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index 85bbaf2c77..05300d6a08 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeleteContainerCommandProto;
 import org.apache.hadoop.hdds.scm.PlacementPolicy;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@@ -51,6 +52,7 @@ import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
@@ -362,6 +364,37 @@ public class ReplicationManager implements SCMService {
     eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
   }
 
+  /**
+   * Sends delete container command for the given container to the given
+   * datanode.
+   *
+   * @param container Container to be deleted
+   * @param replicaIndex Index of the container replica to be deleted
+   * @param datanode  The datanode on which the replica should be deleted
+   * @throws NotLeaderException when this SCM is not the leader
+   */
+  public void sendDeleteCommand(final ContainerInfo container, int replicaIndex,
+      final DatanodeDetails datanode) throws NotLeaderException {
+    LOG.info("Sending delete container command for container {}" +
+        " to datanode {}", container.containerID(), datanode);
+
+    final DeleteContainerCommand deleteCommand =
+        new DeleteContainerCommand(container.containerID(), false);
+    deleteCommand.setTerm(getScmTerm());
+
+    final CommandForDatanode<DeleteContainerCommandProto> datanodeCommand =
+        new CommandForDatanode<>(datanode.getUuid(), deleteCommand);
+    eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND, datanodeCommand);
+    containerReplicaPendingOps.scheduleDeleteReplica(container.containerID(),
+        datanode, replicaIndex);
+
+    synchronized (this) {
+      metrics.incrNumDeletionCmdsSent();
+      metrics.incrNumDeletionBytesTotal(container.getUsedBytes());
+    }
+  }
+
+
   /**
    * Add an under replicated container back to the queue if it was unable to
    * be processed. Its retry count will be incremented before it is re-queued,
@@ -757,7 +790,7 @@ public class ReplicationManager implements SCMService {
     return ReplicationManager.class.getSimpleName();
   }
 
-  public ReplicationManagerMetrics getMetrics() {
+  public synchronized ReplicationManagerMetrics getMetrics() {
     return metrics;
   }
 
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/EmptyContainerHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/EmptyContainerHandler.java
new file mode 100644
index 0000000000..47ac50ce3a
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/health/EmptyContainerHandler.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.ratis.protocol.exceptions.NotLeaderException;
+import org.apache.ratis.util.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This handler deletes a container if it's closed and empty (0 key count)
+ * and all its replicas are empty.
+ */
+public class EmptyContainerHandler extends AbstractCheck {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(EmptyContainerHandler.class);
+
+  private final ReplicationManager replicationManager;
+  private final ContainerManager containerManager;
+
+  public EmptyContainerHandler(ReplicationManager replicationManager,
+      ContainerManager containerManager) {
+    this.replicationManager = replicationManager;
+    this.containerManager = containerManager;
+  }
+
+  /**
+   * Deletes a container if it's closed and empty (0 key count) and all its
+   * replicas are closed and empty.
+   * @param request ContainerCheckRequest object representing the container
+   * @return true if the specified container is empty, otherwise false
+   */
+  @Override
+  public boolean handle(ContainerCheckRequest request) {
+    ContainerInfo containerInfo = request.getContainerInfo();
+    Set<ContainerReplica> replicas = request.getContainerReplicas();
+
+    if (isContainerEmptyAndClosed(containerInfo, replicas)) {
+      request.getReport()
+          .incrementAndSample(ReplicationManagerReport.HealthState.EMPTY,
+              containerInfo.containerID());
+
+      // delete replicas if they are closed and empty
+      deleteContainerReplicas(containerInfo, replicas);
+
+      // Update the container's state
+      try {
+        containerManager.updateContainerState(containerInfo.containerID(),
+            HddsProtos.LifeCycleEvent.DELETE);
+      } catch (IOException | InvalidStateTransitionException |
+          TimeoutException e) {
+        LOG.error("Failed to delete empty container {}",
+            request.getContainerInfo(), e);
+      }
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Returns true if the container is empty and CLOSED.
+   * A container is empty if its key count is 0. The usedBytes counter is not
+   * checked here because usedBytes is not an accurate representation of the
+   * committed blocks. There could be orphaned chunks in the container which
+   * contribute to usedBytes.
+   *
+   * @param container Container to check
+   * @param replicas Set of ContainerReplica
+   * @return true if the container is considered empty, false otherwise
+   */
+  private boolean isContainerEmptyAndClosed(final ContainerInfo container,
+      final Set<ContainerReplica> replicas) {
+    return container.getState() == HddsProtos.LifeCycleState.CLOSED &&
+        container.getNumberOfKeys() == 0 && replicas.stream()
+        .allMatch(r -> r.getState() == ContainerReplicaProto.State.CLOSED &&
+            r.getKeyCount() == 0);
+  }
+
+  /**
+   * Deletes the specified container's replicas if they are closed and empty.
+   *
+   * @param containerInfo ContainerInfo to delete
+   * @param replicas Set of ContainerReplica
+   */
+  private void deleteContainerReplicas(final ContainerInfo containerInfo,
+      final Set<ContainerReplica> replicas) {
+    Preconditions.assertTrue(containerInfo.getState() ==
+        HddsProtos.LifeCycleState.CLOSED);
+    Preconditions.assertTrue(containerInfo.getNumberOfKeys() == 0);
+
+    for (ContainerReplica rp : replicas) {
+      Preconditions.assertTrue(
+          rp.getState() == ContainerReplicaProto.State.CLOSED);
+      Preconditions.assertTrue(rp.getKeyCount() == 0);
+
+      LOG.debug("Trying to delete empty replica with index {} for container " +
+              "{} on datanode {}", rp.getReplicaIndex(),
+          containerInfo.containerID(), rp.getDatanodeDetails().getUuidString());
+      try {
+        replicationManager.sendDeleteCommand(containerInfo,
+            rp.getReplicaIndex(), rp.getDatanodeDetails());
+      } catch (NotLeaderException e) {
+        LOG.warn("Failed to delete empty replica with index {} for container" +
+                " {} on datanode {}",
+            rp.getReplicaIndex(),
+            containerInfo.containerID(),
+            rp.getDatanodeDetails().getUuidString(), e);
+      }
+    }
+  }
+
+}
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
index 6cfc0eeaff..f87d285158 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationTestUtil.java
@@ -84,6 +84,19 @@ public final class ReplicationTestUtil {
     return replicas;
   }
 
+  public static Set<ContainerReplica> createReplicas(ContainerID containerID,
+      ContainerReplicaProto.State replicaState, long keyCount, long bytesUsed,
+      int... indexes) {
+    Set<ContainerReplica> replicas = new HashSet<>();
+    for (int i : indexes) {
+      DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
+      replicas.add(createContainerReplica(containerID, i, IN_SERVICE,
+          replicaState, keyCount, bytesUsed,
+          dn, dn.getUuid()));
+    }
+    return replicas;
+  }
+
   public static Set<ContainerReplica> createReplicasWithSameOrigin(
       ContainerID containerID, ContainerReplicaProto.State replicaState,
       int... indexes) {
@@ -91,7 +104,7 @@ public final class ReplicationTestUtil {
     UUID originNodeId = MockDatanodeDetails.randomDatanodeDetails().getUuid();
     for (int i : indexes) {
       replicas.add(createContainerReplica(
-          containerID, i, IN_SERVICE, replicaState,
+          containerID, i, IN_SERVICE, replicaState, 123L, 1234L,
           MockDatanodeDetails.randomDatanodeDetails(), originNodeId));
     }
     return replicas;
@@ -103,20 +116,22 @@ public final class ReplicationTestUtil {
     DatanodeDetails datanodeDetails
         = MockDatanodeDetails.randomDatanodeDetails();
     return createContainerReplica(containerID, replicaIndex, opState,
-        replicaState, datanodeDetails, datanodeDetails.getUuid());
+        replicaState, 123L, 1234L,
+        datanodeDetails, datanodeDetails.getUuid());
   }
 
+  @SuppressWarnings("checkstyle:ParameterNumber")
   public static ContainerReplica createContainerReplica(ContainerID containerID,
       int replicaIndex, HddsProtos.NodeOperationalState opState,
-      ContainerReplicaProto.State replicaState,
+      ContainerReplicaProto.State replicaState, long keyCount, long bytesUsed,
       DatanodeDetails datanodeDetails, UUID originNodeId) {
     ContainerReplica.ContainerReplicaBuilder builder
         = ContainerReplica.newBuilder();
     datanodeDetails.setPersistedOpState(opState);
     builder.setContainerID(containerID);
     builder.setReplicaIndex(replicaIndex);
-    builder.setKeyCount(123);
-    builder.setBytesUsed(1234);
+    builder.setKeyCount(keyCount);
+    builder.setBytesUsed(bytesUsed);
     builder.setContainerState(replicaState);
     builder.setDatanodeDetails(datanodeDetails);
     builder.setSequenceId(0);
@@ -141,6 +156,20 @@ public final class ReplicationTestUtil {
     return builder.build();
   }
 
+  public static ContainerInfo createContainerInfo(
+      ReplicationConfig replicationConfig, long containerID,
+      HddsProtos.LifeCycleState containerState, long keyCount, long bytesUsed) {
+    ContainerInfo.Builder builder = new ContainerInfo.Builder();
+    builder.setContainerID(containerID);
+    builder.setOwner("Ozone");
+    builder.setPipelineID(PipelineID.randomId());
+    builder.setReplicationConfig(replicationConfig);
+    builder.setState(containerState);
+    builder.setNumberOfKeys(keyCount);
+    builder.setUsedBytes(bytesUsed);
+    return builder.build();
+  }
+
   public static ContainerInfo createContainer(HddsProtos.LifeCycleState state,
       ReplicationConfig replicationConfig) {
     return new ContainerInfo.Builder()
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestEmptyContainerHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestEmptyContainerHandler.java
new file mode 100644
index 0000000000..ac746900f5
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/health/TestEmptyContainerHandler.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication.health;
+
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
+import org.apache.hadoop.hdds.client.RatisReplicationConfig;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.ReplicationManagerReport;
+import org.apache.hadoop.hdds.scm.container.replication.ContainerCheckRequest;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationTestUtil;
+import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSED;
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.CLOSING;
+
+/**
+ * Tests for {@link EmptyContainerHandler}.
+ */
+public class TestEmptyContainerHandler {
+  private ReplicationManager replicationManager;
+  private ContainerManager containerManager;
+  private EmptyContainerHandler emptyContainerHandler;
+  private ECReplicationConfig ecReplicationConfig;
+  private RatisReplicationConfig ratisReplicationConfig;
+
+  @BeforeEach
+  public void setup()
+      throws IOException, InvalidStateTransitionException, TimeoutException {
+    ecReplicationConfig = new ECReplicationConfig(3, 2);
+    ratisReplicationConfig = RatisReplicationConfig.getInstance(
+        HddsProtos.ReplicationFactor.THREE);
+    replicationManager = Mockito.mock(ReplicationManager.class);
+    containerManager = Mockito.mock(ContainerManager.class);
+
+    emptyContainerHandler =
+        new EmptyContainerHandler(replicationManager, containerManager);
+  }
+
+  /**
+   * A container is considered empty if it has 0 key count. Handler should
+   * return true for empty and CLOSED EC containers.
+   */
+  @Test
+  public void testEmptyAndClosedECContainerReturnsTrue()
+      throws InvalidStateTransitionException, IOException, TimeoutException {
+    long keyCount = 0L;
+    long bytesUsed = 123L;
+    ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+        ecReplicationConfig, 1, CLOSED, keyCount, bytesUsed);
+    Set<ContainerReplica> containerReplicas = ReplicationTestUtil
+        .createReplicas(containerInfo.containerID(),
+            ContainerReplicaProto.State.CLOSED, keyCount, bytesUsed,
+            1, 2, 3, 4, 5);
+
+    ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+        .setPendingOps(Collections.emptyList())
+        .setReport(new ReplicationManagerReport())
+        .setContainerInfo(containerInfo)
+        .setContainerReplicas(containerReplicas)
+        .build();
+
+    assertAndVerify(request, true, 5, 1);
+  }
+
+  @Test
+  public void testEmptyAndClosedRatisContainerReturnsTrue()
+      throws InvalidStateTransitionException, IOException, TimeoutException {
+    long keyCount = 0L;
+    long bytesUsed = 123L;
+    ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+        ratisReplicationConfig, 1, CLOSED, keyCount, bytesUsed);
+    Set<ContainerReplica> containerReplicas = ReplicationTestUtil
+        .createReplicas(containerInfo.containerID(),
+            ContainerReplicaProto.State.CLOSED, keyCount, bytesUsed,
+            0, 0, 0);
+
+    ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+        .setPendingOps(Collections.emptyList())
+        .setReport(new ReplicationManagerReport())
+        .setContainerInfo(containerInfo)
+        .setContainerReplicas(containerReplicas)
+        .build();
+
+    assertAndVerify(request, true, 3, 1);
+  }
+
+  /**
+   * Handler should return false when key count is 0 but the container is not
+   * in CLOSED state.
+   */
+  @Test
+  public void testEmptyAndNonClosedECContainerReturnsFalse()
+      throws InvalidStateTransitionException, IOException, TimeoutException {
+    long keyCount = 0L;
+    long bytesUsed = 123L;
+    ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+        ecReplicationConfig, 1, CLOSING, keyCount, bytesUsed);
+
+    // though key count is 0, the container and its replicas are not CLOSED
+    Set<ContainerReplica> containerReplicas = ReplicationTestUtil
+        .createReplicas(containerInfo.containerID(),
+            ContainerReplicaProto.State.OPEN, keyCount, bytesUsed,
+            1, 2, 3, 4, 5);
+
+    ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+        .setPendingOps(Collections.emptyList())
+        .setReport(new ReplicationManagerReport())
+        .setContainerInfo(containerInfo)
+        .setContainerReplicas(containerReplicas)
+        .build();
+
+    assertAndVerify(request, false, 0, 0);
+  }
+
+  /**
+   * This test exists to verify that the definition of an empty container is
+   * 0 key count. Number of used bytes are not considered.
+   */
+  @Test
+  public void testNonEmptyRatisContainerReturnsFalse()
+      throws InvalidStateTransitionException, IOException, TimeoutException {
+    long keyCount = 5L;
+    long bytesUsed = 123L;
+    ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+        ratisReplicationConfig, 1, CLOSED, keyCount, bytesUsed);
+
+    // though container and its replicas are CLOSED, key count is not 0
+    Set<ContainerReplica> containerReplicas = ReplicationTestUtil
+        .createReplicas(containerInfo.containerID(),
+            ContainerReplicaProto.State.CLOSED, keyCount, bytesUsed,
+            0, 0, 0);
+
+    ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+        .setPendingOps(Collections.emptyList())
+        .setReport(new ReplicationManagerReport())
+        .setContainerInfo(containerInfo)
+        .setContainerReplicas(containerReplicas)
+        .build();
+
+    assertAndVerify(request, false, 0, 0);
+  }
+
+  /**
+   * Handler should return false when there is a non-empty replica.
+   */
+  @Test
+  public void testEmptyECContainerWithNonEmptyReplicaReturnsFalse()
+      throws InvalidStateTransitionException, IOException, TimeoutException {
+    ContainerInfo containerInfo = ReplicationTestUtil.createContainerInfo(
+        ecReplicationConfig, 1, CLOSED, 0L, 0L);
+    Set<ContainerReplica> containerReplicas = ReplicationTestUtil
+        .createReplicas(containerInfo.containerID(),
+            ContainerReplicaProto.State.CLOSED, 0L, 0L,
+            1, 2, 3, 4);
+
+    // add a non-empty replica
+    DatanodeDetails mockDn = MockDatanodeDetails.randomDatanodeDetails();
+    containerReplicas.add(
+        ReplicationTestUtil.createContainerReplica(containerInfo.containerID(),
+            5, HddsProtos.NodeOperationalState.IN_SERVICE,
+            ContainerReplicaProto.State.CLOSED, 1L, 100L, mockDn,
+            mockDn.getUuid()));
+
+    ContainerCheckRequest request = new ContainerCheckRequest.Builder()
+        .setPendingOps(Collections.emptyList())
+        .setReport(new ReplicationManagerReport())
+        .setContainerInfo(containerInfo)
+        .setContainerReplicas(containerReplicas)
+        .build();
+
+    // should return false because there is a non-empty replica
+    assertAndVerify(request, false, 0, 0);
+  }
+
+  /**
+   * Asserts that handler returns the specified assertion and delete command
+   * to replicas is sent the specified number of times.
+   * @param request ContainerCheckRequest object to pass to the handler
+   * @param assertion true if expecting the handler to return true, else false
+   * @param times number of times the delete command is expected to be sent
+   * @param numEmptyExpected number of EMPTY and CLOSED containers expected
+   *                         to be found in ReplicationManagerReport
+   */
+  private void assertAndVerify(ContainerCheckRequest request,
+      boolean assertion, int times, long numEmptyExpected)
+      throws IOException, InvalidStateTransitionException, TimeoutException {
+    Assertions.assertEquals(assertion, emptyContainerHandler.handle(request));
+    Mockito.verify(replicationManager, Mockito.times(times))
+        .sendDeleteCommand(Mockito.any(ContainerInfo.class), Mockito.anyInt(),
+            Mockito.any(DatanodeDetails.class));
+    Assertions.assertEquals(numEmptyExpected, request.getReport().getStat(
+        ReplicationManagerReport.HealthState.EMPTY));
+
+    if (times > 0) {
+      Mockito.verify(containerManager, Mockito.times(1))
+          .updateContainerState(Mockito.any(ContainerID.class),
+              Mockito.any(HddsProtos.LifeCycleEvent.class));
+    }
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org