You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ozone.apache.org by so...@apache.org on 2022/06/14 12:29:20 UTC

[ozone] branch master updated: HDDS-6260. EC: Standalone containers should not move to quasi-closed (#3439)

This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 47eddeb371 HDDS-6260. EC: Standalone containers should not move to quasi-closed (#3439)
47eddeb371 is described below

commit 47eddeb371e6c766e3240e21f75fee1619346958
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Tue Jun 14 13:29:15 2022 +0100

    HDDS-6260. EC: Standalone containers should not move to quasi-closed (#3439)
---
 .../CloseContainerCommandHandler.java              |   4 +
 .../TestCloseContainerCommandHandler.java          |  17 +-
 .../scm/container/CloseContainerEventHandler.java  |  12 +-
 .../container/TestCloseContainerEventHandler.java  | 318 +++++++++------------
 4 files changed, 170 insertions(+), 181 deletions(-)

diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index e3d2551f32..6e9082f1e2 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -103,6 +103,10 @@ public class CloseContainerCommandHandler implements CommandHandler {
                   command.getEncodedToken());
           ozoneContainer.getWriteChannel()
               .submitRequest(request, closeCommand.getPipelineID());
+        } else if (closeCommand.getForce()) {
+          // Non-RATIS containers should have the force close flag set, so they
+          // are moved to CLOSED immediately rather than going to quasi-closed.
+          controller.closeContainer(containerId);
         } else {
           controller.quasiCloseContainer(containerId);
           LOG.info("Marking Container {} quasi closed", containerId);
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
index de9968128e..c30ded0051 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
@@ -144,6 +144,17 @@ public class TestCloseContainerCommandHandler {
         .quasiCloseContainer(container);
   }
 
+  @Test
+  public void closeContainerWithForceFlagSet() throws IOException {
+    // close a container that's associated with an existing pipeline
+    subject.handle(forceCloseWithoutPipeline(), ozoneContainer, context, null);
+
+    verify(containerHandler)
+        .markContainerForClose(container);
+    verify(writeChannel, never()).submitRequest(any(), any());
+    verify(containerHandler).closeContainer(container);
+  }
+
   @Test
   public void forceCloseQuasiClosedContainer() throws Exception {
     // force-close a container that's already quasi closed
@@ -165,10 +176,10 @@ public class TestCloseContainerCommandHandler {
 
     verify(writeChannel, never())
         .submitRequest(any(), any());
-    // Container in CLOSING state is moved to UNHEALTHY if pipeline does not
-    // exist. Container should not exist in CLOSING state without a pipeline.
+    // Container in CLOSING state is moved to CLOSED if pipeline does not
+    // exist and force is set to TRUE.
     verify(containerHandler)
-        .quasiCloseContainer(container);
+        .closeContainer(container);
   }
 
   @Test
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index ca6beee4ba..1aab691f1e 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.stream.Collectors;
 
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
@@ -80,8 +81,17 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
           .getContainer(containerID);
       // Send close command to datanodes, if the container is in CLOSING state
       if (container.getState() == LifeCycleState.CLOSING) {
+        boolean force = false;
+        // Any container that is not of type RATIS should be moved to CLOSED
+        // immediately on the DNs. Setting force to true, avoids the container
+        // going into the QUASI_CLOSED state, which is only applicable for RATIS
+        // containers.
+        if (container.getReplicationConfig().getReplicationType()
+            != HddsProtos.ReplicationType.RATIS) {
+          force = true;
+        }
         SCMCommand<?> command = new CloseContainerCommand(
-            containerID.getId(), container.getPipelineID());
+            containerID.getId(), container.getPipelineID(), force);
         command.setTerm(scmContext.getTermOfLeader());
         command.setEncodedToken(getContainerToken(containerID));
 
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index 7e46c3e2a1..449c6f7e8d 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -17,212 +17,176 @@
 
 package org.apache.hadoop.hdds.scm.container;
 
-import java.io.File;
 import java.io.IOException;
-import java.time.ZoneId;
-import java.time.ZoneOffset;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
 
-import org.apache.hadoop.conf.StorageUnit;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.client.ECReplicationConfig;
 import org.apache.hadoop.hdds.client.RatisReplicationConfig;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.client.ReplicationConfig;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.HddsTestUtils;
-import org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaPendingOps;
-import org.apache.hadoop.hdds.scm.ha.SCMHAManagerStub;
 import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.ha.SCMService.Event;
-import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
-import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
-import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
-import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
-import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
-import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
-import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.common.MonotonicClock;
-import org.apache.hadoop.ozone.container.common.SCMTestUtils;
-import org.apache.ozone.test.GenericTestUtils;
-
-import org.apache.commons.lang3.RandomUtils;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT;
-import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+
+import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.THREE;
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
 
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
+import org.junit.Assert;
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
 
 /**
  * Tests the closeContainerEventHandler class.
  */
 public class TestCloseContainerEventHandler {
 
-  private static OzoneConfiguration configuration;
-  private static MockNodeManager nodeManager;
-  private static PipelineManagerImpl pipelineManager;
-  private static ContainerManager containerManager;
-  private static long size;
-  private static File testDir;
-  private static EventQueue eventQueue;
-  private static SCMContext scmContext;
-  private static SCMMetadataStore scmMetadataStore;
-  private static SCMHAManager scmhaManager;
-  private static SequenceIdGenerator sequenceIdGen;
-
-  @BeforeAll
-  public static void setUp() throws Exception {
-    configuration = SCMTestUtils.getConf();
-    size = (long)configuration.getStorageSize(OZONE_SCM_CONTAINER_SIZE,
-        OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
-    testDir = GenericTestUtils
-        .getTestDir(TestCloseContainerEventHandler.class.getSimpleName());
-    configuration
-        .set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
-    configuration.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 16);
-    nodeManager = new MockNodeManager(true, 10);
-    eventQueue = new EventQueue();
-    scmContext = SCMContext.emptyContext();
-    scmMetadataStore = new SCMMetadataStoreImpl(configuration);
-    scmhaManager = SCMHAManagerStub.getInstance(true);
-    sequenceIdGen = new SequenceIdGenerator(
-        configuration, scmhaManager, scmMetadataStore.getSequenceIdTable());
-
-    SCMServiceManager serviceManager = new SCMServiceManager();
-
-    pipelineManager =
-        PipelineManagerImpl.newPipelineManager(
-            configuration,
-            scmhaManager,
-            nodeManager,
-            scmMetadataStore.getPipelineTable(),
-            eventQueue,
-            scmContext,
-            serviceManager,
-            new MonotonicClock(ZoneOffset.UTC));
-
-    PipelineProvider mockRatisProvider =
-        new MockRatisPipelineProvider(nodeManager,
-            pipelineManager.getStateManager(), configuration, eventQueue);
-    pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
-        mockRatisProvider);
-    containerManager = new ContainerManagerImpl(configuration,
-        scmhaManager,
-        sequenceIdGen,
-        pipelineManager,
-        scmMetadataStore.getContainerTable(),
-        new ContainerReplicaPendingOps(configuration,
-            new MonotonicClock(ZoneId.systemDefault())));
-
-    // trigger BackgroundPipelineCreator to take effect.
-    serviceManager.notifyEventTriggered(Event.PRE_CHECK_COMPLETED);
-
-    eventQueue.addHandler(CLOSE_CONTAINER,
-        new CloseContainerEventHandler(
-            pipelineManager, containerManager, scmContext));
-    eventQueue.addHandler(DATANODE_COMMAND, nodeManager);
-    // Move all pipelines created by background from ALLOCATED to OPEN state
-    Thread.sleep(2000);
-    HddsTestUtils.openAllRatisPipelines(pipelineManager);
+  private static final ReplicationConfig RATIS_REP_CONFIG
+      = RatisReplicationConfig.getInstance(THREE);
+  private static final ReplicationConfig EC_REP_CONFIG
+      = new ECReplicationConfig(3, 2);
+
+  private ContainerManager containerManager;
+  private PipelineManager pipelineManager;
+  private EventPublisher eventPublisher;
+  private CloseContainerEventHandler eventHandler;
+
+  @Captor
+  private ArgumentCaptor<CommandForDatanode> commandCaptor;
+
+  @BeforeEach
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    containerManager = Mockito.mock(ContainerManager.class);
+    pipelineManager = Mockito.mock(PipelineManager.class);
+    SCMContext scmContext = Mockito.mock(SCMContext.class);
+    eventPublisher = Mockito.mock(EventPublisher.class);
+    eventHandler = new CloseContainerEventHandler(
+        pipelineManager, containerManager, scmContext);
   }
 
-  @AfterAll
-  public static void tearDown() throws Exception {
-    if (containerManager != null) {
-      containerManager.close();
-    }
-    if (pipelineManager != null) {
-      pipelineManager.close();
-    }
-    if (scmMetadataStore.getStore() != null) {
-      scmMetadataStore.getStore().close();
-    }
-    FileUtil.fullyDelete(testDir);
+  @Test
+  public void testCloseContainerEventWithInvalidContainer()
+      throws ContainerNotFoundException, PipelineNotFoundException {
+    Mockito.when(containerManager.getContainer(any()))
+        .thenThrow(ContainerNotFoundException.class);
+    Mockito.when(pipelineManager.getPipeline(any())).thenReturn(
+        createPipeline(RATIS_REP_CONFIG, 3));
+
+    eventHandler.onMessage(ContainerID.valueOf(1234), eventPublisher);
+    Mockito.verify(eventPublisher, never()).fireEvent(any(), any());
   }
 
   @Test
-  public void testIfCloseContainerEventHadnlerInvoked() {
-    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
-        .captureLogs(CloseContainerEventHandler.LOG);
-    eventQueue.fireEvent(CLOSE_CONTAINER,
-        ContainerID.valueOf(Math.abs(RandomUtils.nextInt())));
-    eventQueue.processAll(1000);
-    Assertions.assertTrue(logCapturer.getOutput()
-        .contains("Close container Event triggered for container"));
+  public void testCloseContainerInInvalidState()
+      throws ContainerNotFoundException {
+    final Pipeline pipeline = createPipeline(RATIS_REP_CONFIG, 3);
+    final ContainerInfo container =
+        createContainer(RATIS_REP_CONFIG, pipeline.getId());
+    container.setState(HddsProtos.LifeCycleState.CLOSED);
+    Mockito.when(containerManager.getContainer(container.containerID()))
+        .thenReturn(container);
+
+    eventHandler.onMessage(container.containerID(), eventPublisher);
+    Mockito.verify(eventPublisher, never())
+        .fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture());
   }
 
   @Test
-  public void testCloseContainerEventWithInvalidContainer() {
-    long id = Math.abs(RandomUtils.nextInt());
-    GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
-        .captureLogs(CloseContainerEventHandler.LOG);
-    eventQueue.fireEvent(CLOSE_CONTAINER,
-        ContainerID.valueOf(id));
-    eventQueue.processAll(1000);
-    Assertions.assertTrue(logCapturer.getOutput()
-        .contains("Failed to close the container"));
+  public void testCloseContainerEventWithRatisContainers()
+      throws IOException, InvalidStateTransitionException {
+    closeContainerForValidContainer(RATIS_REP_CONFIG, 3, false);
   }
 
   @Test
-  public void testCloseContainerEventWithValidContainers() throws IOException {
-    ContainerInfo container = containerManager
-        .allocateContainer(RatisReplicationConfig.getInstance(
-            ReplicationFactor.ONE), OzoneConsts.OZONE);
-    ContainerID id = container.containerID();
-    DatanodeDetails datanode = pipelineManager
-        .getPipeline(container.getPipelineID()).getFirstNode();
-    int closeCount = nodeManager.getCommandCount(datanode);
-    eventQueue.fireEvent(CLOSE_CONTAINER, id);
-    eventQueue.processAll(1000);
-    Assertions.assertEquals(closeCount + 1,
-        nodeManager.getCommandCount(datanode));
-    Assertions.assertEquals(HddsProtos.LifeCycleState.CLOSING,
-        containerManager.getContainer(id).getState());
+  public void testCloseContainerEventECContainer()
+      throws InvalidStateTransitionException, IOException {
+    closeContainerForValidContainer(EC_REP_CONFIG, 5, true);
   }
 
-  @Test
-  public void testCloseContainerEventWithRatis() throws IOException {
-    GenericTestUtils.LogCapturer
-        .captureLogs(CloseContainerEventHandler.LOG);
-    ContainerInfo container = containerManager
-        .allocateContainer(RatisReplicationConfig.getInstance(
-            ReplicationFactor.THREE), OzoneConsts.OZONE);
-    ContainerID id = container.containerID();
-    int[] closeCount = new int[3];
-    eventQueue.fireEvent(CLOSE_CONTAINER, id);
-    eventQueue.processAll(1000);
-    int i = 0;
-    for (DatanodeDetails details : pipelineManager
-        .getPipeline(container.getPipelineID()).getNodes()) {
-      closeCount[i] = nodeManager.getCommandCount(details);
-      i++;
-    }
-    i = 0;
-    for (DatanodeDetails details : pipelineManager
-        .getPipeline(container.getPipelineID()).getNodes()) {
-      Assertions.assertEquals(closeCount[i],
-          nodeManager.getCommandCount(details));
-      i++;
+  private void closeContainerForValidContainer(ReplicationConfig repConfig,
+      int nodeCount, boolean forceClose)
+      throws IOException, InvalidStateTransitionException {
+    final Pipeline pipeline = createPipeline(repConfig, nodeCount);
+    final ContainerInfo container =
+        createContainer(repConfig, pipeline.getId());
+    Mockito.when(containerManager.getContainer(container.containerID()))
+        .thenReturn(container);
+    Mockito.doAnswer(
+        i -> {
+          container.setState(HddsProtos.LifeCycleState.CLOSING);
+          return null;
+        }).when(containerManager).updateContainerState(container.containerID(),
+        HddsProtos.LifeCycleEvent.FINALIZE);
+    Mockito.when(pipelineManager.getPipeline(pipeline.getId()))
+        .thenReturn(pipeline);
+
+    eventHandler.onMessage(container.containerID(), eventPublisher);
+
+    Mockito.verify(containerManager).updateContainerState(any(), any());
+    Mockito.verify(eventPublisher, times(nodeCount))
+        .fireEvent(eq(DATANODE_COMMAND), commandCaptor.capture());
+
+    List<CommandForDatanode> cmds = commandCaptor.getAllValues();
+    Set<UUID> pipelineDNs = pipeline
+        .getNodes()
+        .stream()
+        .map(d -> d.getUuid())
+        .collect(Collectors.toSet());
+    for (CommandForDatanode c : cmds) {
+      Assert.assertTrue(pipelineDNs.contains(c.getDatanodeId()));
+      pipelineDNs.remove(c.getDatanodeId());
+      CloseContainerCommand ccc = (CloseContainerCommand)c.getCommand();
+      Assert.assertEquals(container.getContainerID(), ccc.getContainerID());
+      Assert.assertEquals(pipeline.getId(), ccc.getPipelineID());
+      Assert.assertEquals(forceClose, ccc.getProto().getForce());
     }
-    eventQueue.fireEvent(CLOSE_CONTAINER, id);
-    eventQueue.processAll(1000);
-    i = 0;
-    // Make sure close is queued for each datanode on the pipeline
-    for (DatanodeDetails details : pipelineManager
-        .getPipeline(container.getPipelineID()).getNodes()) {
-      Assertions.assertEquals(closeCount[i] + 1,
-          nodeManager.getCommandCount(details));
-      Assertions.assertEquals(HddsProtos.LifeCycleState.CLOSING,
-          containerManager.getContainer(id).getState());
-      i++;
+    Assert.assertEquals(0, pipelineDNs.size());
+  }
+
+  private Pipeline createPipeline(ReplicationConfig repConfig, int nodes) {
+    Pipeline.Builder builder = Pipeline.newBuilder();
+    builder.setId(PipelineID.randomId());
+    builder.setReplicationConfig(repConfig);
+    builder.setState(Pipeline.PipelineState.OPEN);
+    List<DatanodeDetails> dns = new ArrayList<>();
+
+    for (int i = 0; i < nodes; i++) {
+      dns.add(MockDatanodeDetails.randomDatanodeDetails());
     }
+    builder.setNodes(dns);
+    builder.setLeaderId(dns.get(0).getUuid());
+    return builder.build();
   }
+
+  private ContainerInfo createContainer(ReplicationConfig repConfig,
+      PipelineID pipelineID) {
+    ContainerInfo.Builder builder = new ContainerInfo.Builder();
+    builder.setContainerID(1);
+    builder.setOwner("Ozone");
+    builder.setPipelineID(pipelineID);
+    builder.setReplicationConfig(repConfig);
+    builder.setState(HddsProtos.LifeCycleState.OPEN);
+    return builder.build();
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@ozone.apache.org
For additional commands, e-mail: commits-help@ozone.apache.org