You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/09/05 06:01:56 UTC

hadoop git commit: HDDS-268. Add SCM close container watcher. Contributed by Ajay Kumar.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 6ccb809c2 -> 85c3fe341


HDDS-268. Add SCM close container watcher. Contributed by Ajay Kumar.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/85c3fe34
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/85c3fe34
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/85c3fe34

Branch: refs/heads/trunk
Commit: 85c3fe341a77bc1a74fdc7af64e18e4557fa8e96
Parents: 6ccb809
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Tue Sep 4 22:56:42 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Tue Sep 4 22:56:57 2018 -0700

----------------------------------------------------------------------
 .../hadoop/hdds/server/events/EventWatcher.java |  20 +-
 .../scm/command/CommandStatusReportHandler.java |   6 +-
 .../container/CloseContainerEventHandler.java   |  26 ++
 .../scm/container/CloseContainerWatcher.java    | 100 +++++++
 .../hadoop/hdds/scm/events/SCMEvents.java       |  11 +
 .../scm/server/StorageContainerManager.java     |   8 +
 .../container/TestCloseContainerWatcher.java    | 287 +++++++++++++++++++
 7 files changed, 450 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/85c3fe34/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
index e3fee63..ba5078b 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
@@ -102,13 +102,13 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
     queue.addHandler(startEvent, this::handleStartEvent);
 
     queue.addHandler(completionEvent, (completionPayload, publisher) -> {
-      long id = completionPayload.getId();
       try {
-        handleCompletion(id, publisher);
+        handleCompletion(completionPayload, publisher);
       } catch (LeaseNotFoundException e) {
         //It's already done. Too late, we already retried it.
         //Not a real problem.
-        LOG.warn("Completion event without active lease. Id={}", id);
+        LOG.warn("Completion event without active lease. Id={}",
+            completionPayload.getId());
       }
     });
 
@@ -140,9 +140,11 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
     }
   }
 
-  private synchronized void handleCompletion(long id,
-      EventPublisher publisher) throws LeaseNotFoundException {
+  protected synchronized void handleCompletion(COMPLETION_PAYLOAD
+      completionPayload, EventPublisher publisher) throws
+      LeaseNotFoundException {
     metrics.incrementCompletedEvents();
+    long id = completionPayload.getId();
     leaseManager.release(id);
     TIMEOUT_PAYLOAD payload = trackedEventsByID.remove(id);
     trackedEvents.remove(payload);
@@ -196,4 +198,12 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
   protected EventWatcherMetrics getMetrics() {
     return metrics;
   }
+
+  /**
+   * Returns a tracked event to which the specified id is
+   * mapped, or {@code null} if there is no mapping for the id.
+   */
+  public TIMEOUT_PAYLOAD getTrackedEventbyId(long id) {
+    return trackedEventsByID.get(id);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85c3fe34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
index 9413a46..054665a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
@@ -103,7 +103,7 @@ public class CommandStatusReportHandler implements
    * Wrapper event for Replicate Command.
    */
   public static class ReplicationStatus extends CommandStatusEvent {
-    ReplicationStatus(CommandStatus cmdStatus) {
+    public ReplicationStatus(CommandStatus cmdStatus) {
       super(cmdStatus);
     }
   }
@@ -112,7 +112,7 @@ public class CommandStatusReportHandler implements
    * Wrapper event for CloseContainer Command.
    */
   public static class CloseContainerStatus extends CommandStatusEvent {
-    CloseContainerStatus(CommandStatus cmdStatus) {
+    public CloseContainerStatus(CommandStatus cmdStatus) {
       super(cmdStatus);
     }
   }
@@ -121,7 +121,7 @@ public class CommandStatusReportHandler implements
    * Wrapper event for DeleteBlock Command.
    */
   public static class DeleteBlockCommandStatus extends CommandStatusEvent {
-    DeleteBlockCommandStatus(CommandStatus cmdStatus) {
+    public DeleteBlockCommandStatus(CommandStatus cmdStatus) {
       super(cmdStatus);
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85c3fe34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 863907e..b94ce4f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -23,12 +23,14 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.server.events.EventHandler;
 import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ;
 
 /**
  * In case of a node failure, volume failure, volume out of spapce, node
@@ -80,6 +82,8 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
             new CloseContainerCommand(containerID.getId(),
                 info.getReplicationType(), info.getPipelineID()));
         publisher.fireEvent(DATANODE_COMMAND, closeContainerCommand);
+        publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ, new
+            CloseContainerRetryableReq(containerID));
       }
       try {
         // Finalize event will make sure the state of the container transitions
@@ -107,4 +111,26 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
     }
 
   }
+
+  /**
+   * Class to create retryable event. Prevents redundant requests for same
+   * container Id.
+   */
+  public static class CloseContainerRetryableReq implements
+      IdentifiableEventPayload {
+
+    private ContainerID containerID;
+    public CloseContainerRetryableReq(ContainerID containerID) {
+      this.containerID = containerID;
+    }
+
+    public ContainerID getContainerID() {
+      return containerID;
+    }
+
+    @Override
+    public long getId() {
+      return containerID.getId();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85c3fe34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
new file mode 100644
index 0000000..8e277b9
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
+    .CloseContainerStatus;
+
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventWatcher;
+import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
+    .CloseContainerRetryableReq;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.ozone.lease.LeaseNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * This watcher will watch for CLOSE_CONTAINER_STATUS events fired from
+ * CommandStatusReport. If required it will re-trigger CloseContainer command
+ * for DataNodes to CloseContainerEventHandler.
+ */
+public class CloseContainerWatcher extends
+    EventWatcher<CloseContainerRetryableReq, CloseContainerStatus> {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(CloseContainerWatcher.class);
+  private final Mapping containerManager;
+
+  public CloseContainerWatcher(Event<CloseContainerRetryableReq> startEvent,
+      Event<CloseContainerStatus> completionEvent,
+      LeaseManager<Long> leaseManager, Mapping containerManager) {
+    super(startEvent, completionEvent, leaseManager);
+    this.containerManager = containerManager;
+  }
+
+  @Override
+  protected void onTimeout(EventPublisher publisher,
+      CloseContainerRetryableReq payload) {
+    // Let CloseContainerEventHandler handle this message.
+    this.resendEventToHandler(payload.getId(), publisher);
+  }
+
+  @Override
+  protected void onFinished(EventPublisher publisher,
+      CloseContainerRetryableReq payload) {
+    LOG.trace("CloseContainerCommand for containerId: {} executed ", payload
+        .getContainerID().getId());
+  }
+
+  @Override
+  protected synchronized void handleCompletion(CloseContainerStatus status,
+      EventPublisher publisher) throws LeaseNotFoundException {
+    // If status is PENDING then return without doing anything.
+    if(status.getCmdStatus().getStatus().equals(Status.PENDING)){
+      return;
+    }
+
+    CloseContainerRetryableReq closeCont = getTrackedEventbyId(status.getId());
+    super.handleCompletion(status, publisher);
+    // If status is FAILED then send a msg to Handler to resend the command.
+    if (status.getCmdStatus().getStatus().equals(Status.FAILED) && closeCont
+        != null) {
+      this.resendEventToHandler(closeCont.getId(), publisher);
+    }
+  }
+
+  private void resendEventToHandler(long containerID, EventPublisher
+      publisher) {
+    try {
+      // Check if container is still open
+      if (containerManager.getContainer(containerID).isContainerOpen()) {
+        publisher.fireEvent(SCMEvents.CLOSE_CONTAINER,
+            ContainerID.valueof(containerID));
+      }
+    } catch (IOException e) {
+      LOG.warn("Error in CloseContainerWatcher while processing event " +
+          "for containerId {} ExceptionMsg: ", containerID, e.getMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85c3fe34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 5911ce2..9a4f887 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
     .DeleteBlockCommandStatus;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
     .ReplicationStatus;
+import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler.CloseContainerRetryableReq;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
     .ContainerActionsFromDatanode;
@@ -104,6 +105,16 @@ public final class SCMEvents {
       new TypedEvent<>(ContainerID.class, "Close_Container");
 
   /**
+   * A CLOSE_CONTAINER_RETRYABLE_REQ will be triggered by
+   * CloseContainerEventHandler after sending a SCMCommand to DataNode.
+   * CloseContainerWatcher will track this event. Watcher will be responsible
+   * for retrying it in event of failure or timeout.
+   */
+  public static final TypedEvent<CloseContainerRetryableReq>
+      CLOSE_CONTAINER_RETRYABLE_REQ = new TypedEvent<>(
+      CloseContainerRetryableReq.class, "Close_Container_Retryable");
+
+  /**
    * This event will be triggered whenever a new datanode is registered with
    * SCM.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85c3fe34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 7e2bc23..061ff78 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
 import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler;
 import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
 import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
+import org.apache.hadoop.hdds.scm.container.CloseContainerWatcher;
 import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
 import org.apache.hadoop.hdds.scm.container.ContainerMapping;
 import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
@@ -257,6 +258,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
         scmContainerManager.getStateManager(), eventQueue,
         commandWatcherLeaseManager);
 
+    // setup CloseContainer watcher
+    CloseContainerWatcher closeContainerWatcher =
+        new CloseContainerWatcher(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+            SCMEvents.CLOSE_CONTAINER_STATUS, commandWatcherLeaseManager,
+            scmContainerManager);
+    closeContainerWatcher.start(eventQueue);
+
     scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
         .OZONE_ADMINISTRATORS);
     scmUsername = UserGroupInformation.getCurrentUser().getUserName();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/85c3fe34/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/TestCloseContainerWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/TestCloseContainerWatcher.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/TestCloseContainerWatcher.java
new file mode 100644
index 0000000..56c3830
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/TestCloseContainerWatcher.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container;
+
+import org.apache.hadoop.hdds.HddsIdFactory;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
+    .CloseContainerStatus;
+import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
+    .CloseContainerRetryableReq;
+import org.apache.hadoop.hdds.scm.container.CloseContainerWatcher;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerMapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.EventWatcher;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test class for {@link CloseContainerWatcher}.
+ * */
+public class TestCloseContainerWatcher implements EventHandler<ContainerID> {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(TestCloseContainerWatcher.class);
+  private static EventWatcher<CloseContainerRetryableReq, CloseContainerStatus>
+      watcher;
+  private static LeaseManager<Long> leaseManager;
+  private static ContainerMapping containerMapping = Mockito
+      .mock(ContainerMapping.class);
+  private static EventQueue queue;
+  @Rule
+  public Timeout timeout = new Timeout(1000*15);
+
+  @After
+  public void stop() {
+    leaseManager.shutdown();
+    queue.close();
+  }
+
+  /*
+   * This test will test watcher for Failure status event.
+   * */
+  @Test
+  public void testWatcherForFailureStatusEvent() throws
+      InterruptedException, IOException {
+    setupWatcher(90000L);
+    long id1 = HddsIdFactory.getLongId();
+    long id2 = HddsIdFactory.getLongId();
+    queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
+    setupMock(id1, id2, true);
+    GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
+        .captureLogs(LOG);
+    GenericTestUtils.LogCapturer watcherLogger = GenericTestUtils.LogCapturer
+        .captureLogs(CloseContainerWatcher.LOG);
+    GenericTestUtils.setLogLevel(CloseContainerWatcher.LOG, Level.TRACE);
+    testLogger.clearOutput();
+    watcherLogger.clearOutput();
+
+    CommandStatus cmdStatus1 = CommandStatus.newBuilder()
+        .setCmdId(id1)
+        .setStatus(CommandStatus.Status.FAILED)
+        .setType(Type.closeContainerCommand).build();
+    CommandStatus cmdStatus2 = CommandStatus.newBuilder()
+        .setCmdId(id2)
+        .setStatus(CommandStatus.Status.FAILED)
+        .setType(Type.closeContainerCommand).build();
+
+    // File events to watcher
+    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+        new CloseContainerRetryableReq(ContainerID.valueof(id1)));
+    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+        new CloseContainerRetryableReq(ContainerID.valueof(id2)));
+    Thread.sleep(10L);
+    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
+        CloseContainerStatus(cmdStatus1));
+    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
+        CloseContainerStatus(cmdStatus2));
+
+    Thread.sleep(1000*4L);
+    // validation
+    assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand for " +
+        "containerId: " + id1 + " executed"));
+    assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand for " +
+        "containerId: " + id2 + " executed"));
+    assertTrue(
+        testLogger.getOutput().contains("Handling closeContainerEvent " +
+            "for containerId: id=" + id1));
+    assertTrue(testLogger.getOutput().contains("Handling closeContainerEvent " +
+        "for containerId: id=" + id2));
+
+  }
+
+  @Test
+  public void testWatcherForPendingStatusEvent() throws
+      InterruptedException, IOException {
+    setupWatcher(90000L);
+    long id1 = HddsIdFactory.getLongId();
+    long id2 = HddsIdFactory.getLongId();
+    queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
+    setupMock(id1, id2, true);
+    GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
+        .captureLogs(LOG);
+    GenericTestUtils.LogCapturer watcherLogger = GenericTestUtils.LogCapturer
+        .captureLogs(CloseContainerWatcher.LOG);
+    GenericTestUtils.setLogLevel(CloseContainerWatcher.LOG, Level.TRACE);
+    testLogger.clearOutput();
+    watcherLogger.clearOutput();
+
+    CommandStatus cmdStatus1 = CommandStatus.newBuilder()
+        .setCmdId(id1)
+        .setStatus(CommandStatus.Status.PENDING)
+        .setType(Type.closeContainerCommand).build();
+    CommandStatus cmdStatus2 = CommandStatus.newBuilder()
+        .setCmdId(id2)
+        .setStatus(CommandStatus.Status.PENDING)
+        .setType(Type.closeContainerCommand).build();
+
+    // File events to watcher
+    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+        new CloseContainerRetryableReq(ContainerID.valueof(id1)));
+    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+        new CloseContainerRetryableReq(ContainerID.valueof(id2)));
+    Thread.sleep(10L);
+    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
+        CloseContainerStatus(cmdStatus1));
+    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
+        CloseContainerStatus(cmdStatus2));
+
+    Thread.sleep(1000*2L);
+    // validation
+    assertFalse(watcherLogger.getOutput().contains("CloseContainerCommand "
+        + "for containerId: " + id1 + " executed"));
+    assertFalse(watcherLogger.getOutput().contains("CloseContainerCommand "
+        + "for containerId: " + id2 + " executed"));
+    assertFalse(testLogger.getOutput().contains("Handling "
+        + "closeContainerEvent for containerId: id=" + id1));
+    assertFalse(testLogger.getOutput().contains("Handling "
+        + "closeContainerEvent for containerId: id=" + id2));
+
+  }
+
+  @Test
+  public void testWatcherForExecutedStatusEvent()
+      throws IOException, InterruptedException {
+    setupWatcher(90000L);
+    long id1 = HddsIdFactory.getLongId();
+    long id2 = HddsIdFactory.getLongId();
+    queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
+    setupMock(id1, id2, true);
+    GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
+        .captureLogs(LOG);
+    GenericTestUtils.LogCapturer watcherLogger = GenericTestUtils.LogCapturer
+        .captureLogs(CloseContainerWatcher.LOG);
+    GenericTestUtils.setLogLevel(CloseContainerWatcher.LOG, Level.TRACE);
+    testLogger.clearOutput();
+    watcherLogger.clearOutput();
+
+    // When both of the pending event are executed successfully by DataNode
+    CommandStatus cmdStatus1 = CommandStatus.newBuilder()
+        .setCmdId(id1)
+        .setStatus(CommandStatus.Status.EXECUTED)
+        .setType(Type.closeContainerCommand).build();
+    CommandStatus cmdStatus2 = CommandStatus.newBuilder()
+        .setCmdId(id2)
+        .setStatus(CommandStatus.Status.EXECUTED)
+        .setType(Type.closeContainerCommand).build();
+    // File events to watcher
+    testLogger.clearOutput();
+    watcherLogger.clearOutput();
+    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+        new CloseContainerRetryableReq(ContainerID.valueof(id1)));
+    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+        new CloseContainerRetryableReq(ContainerID.valueof(id2)));
+    Thread.sleep(10L);
+    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS,
+        new CloseContainerStatus(cmdStatus1));
+    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS,
+        new CloseContainerStatus(cmdStatus2));
+
+    Thread.sleep(1000*3L);
+    // validation
+    assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand "
+        + "for containerId: " + id1 + " executed"));
+    assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand "
+        + "for containerId: " + id2 + " executed"));
+    assertFalse(testLogger.getOutput().contains("Handling "
+        + "closeContainerEvent for containerId: id=" + id1));
+    assertFalse(testLogger.getOutput().contains("Handling "
+        + "closeContainerEvent for containerId: id=" + id2));
+  }
+
+  private void setupWatcher(long time) {
+    leaseManager = new LeaseManager<>("TestCloseContainerWatcher#LeaseManager",
+        time);
+    leaseManager.start();
+    watcher = new CloseContainerWatcher(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+        SCMEvents.CLOSE_CONTAINER_STATUS, leaseManager, containerMapping);
+    queue = new EventQueue();
+    watcher.start(queue);
+  }
+
+  /*
+   * This test will fire two retryable closeContainer events. Both will timeout.
+   * First event container will be open at time of handling so it should be
+   * sent back to appropriate handler. Second event container will be closed,
+   * so it should not be retried.
+   * */
+  @Test
+  public void testWatcherRetryableTimeoutHandling() throws InterruptedException,
+      IOException {
+
+    long id1 = HddsIdFactory.getLongId();
+    long id2 = HddsIdFactory.getLongId();
+    setupWatcher(1000L);
+    queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
+    setupMock(id1, id2, false);
+    GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
+        .captureLogs(LOG);
+    testLogger.clearOutput();
+
+    // File events to watcher
+    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+        new CloseContainerRetryableReq(ContainerID.valueof(id1)));
+    queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+        new CloseContainerRetryableReq(ContainerID.valueof(id2)));
+
+    Thread.sleep(1000L + 10);
+
+    // validation
+    assertTrue(testLogger.getOutput().contains("Handling "
+        + "closeContainerEvent for containerId: id=" + id1));
+    assertFalse(testLogger.getOutput().contains("Handling "
+        + "closeContainerEvent for containerId: id=" + id2));
+  }
+
+
+  private void setupMock(long id1, long id2, boolean isOpen)
+      throws IOException {
+    ContainerInfo containerInfo = Mockito.mock(ContainerInfo.class);
+    ContainerInfo containerInfo2 = Mockito.mock(ContainerInfo.class);
+    when(containerMapping.getContainer(id1)).thenReturn(containerInfo);
+    when(containerMapping.getContainer(id2)).thenReturn(containerInfo2);
+    when(containerInfo.isContainerOpen()).thenReturn(true);
+    when(containerInfo2.isContainerOpen()).thenReturn(isOpen);
+  }
+
+  @Override
+  public void onMessage(ContainerID containerID, EventPublisher publisher) {
+    LOG.info("Handling closeContainerEvent for containerId: {}", containerID);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org