You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/09/05 06:01:56 UTC
hadoop git commit: HDDS-268. Add SCM close container watcher.
Contributed by Ajay Kumar.
Repository: hadoop
Updated Branches:
refs/heads/trunk 6ccb809c2 -> 85c3fe341
HDDS-268. Add SCM close container watcher. Contributed by Ajay Kumar.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/85c3fe34
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/85c3fe34
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/85c3fe34
Branch: refs/heads/trunk
Commit: 85c3fe341a77bc1a74fdc7af64e18e4557fa8e96
Parents: 6ccb809
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Tue Sep 4 22:56:42 2018 -0700
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Tue Sep 4 22:56:57 2018 -0700
----------------------------------------------------------------------
.../hadoop/hdds/server/events/EventWatcher.java | 20 +-
.../scm/command/CommandStatusReportHandler.java | 6 +-
.../container/CloseContainerEventHandler.java | 26 ++
.../scm/container/CloseContainerWatcher.java | 100 +++++++
.../hadoop/hdds/scm/events/SCMEvents.java | 11 +
.../scm/server/StorageContainerManager.java | 8 +
.../container/TestCloseContainerWatcher.java | 287 +++++++++++++++++++
7 files changed, 450 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85c3fe34/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
index e3fee63..ba5078b 100644
--- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
+++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java
@@ -102,13 +102,13 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
queue.addHandler(startEvent, this::handleStartEvent);
queue.addHandler(completionEvent, (completionPayload, publisher) -> {
- long id = completionPayload.getId();
try {
- handleCompletion(id, publisher);
+ handleCompletion(completionPayload, publisher);
} catch (LeaseNotFoundException e) {
//It's already done. Too late, we already retried it.
//Not a real problem.
- LOG.warn("Completion event without active lease. Id={}", id);
+ LOG.warn("Completion event without active lease. Id={}",
+ completionPayload.getId());
}
});
@@ -140,9 +140,11 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
}
}
- private synchronized void handleCompletion(long id,
- EventPublisher publisher) throws LeaseNotFoundException {
+ protected synchronized void handleCompletion(COMPLETION_PAYLOAD
+ completionPayload, EventPublisher publisher) throws
+ LeaseNotFoundException {
metrics.incrementCompletedEvents();
+ long id = completionPayload.getId();
leaseManager.release(id);
TIMEOUT_PAYLOAD payload = trackedEventsByID.remove(id);
trackedEvents.remove(payload);
@@ -196,4 +198,12 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
protected EventWatcherMetrics getMetrics() {
return metrics;
}
+
+ /**
+ * Returns a tracked event to which the specified id is
+ * mapped, or {@code null} if there is no mapping for the id.
+ */
+ public TIMEOUT_PAYLOAD getTrackedEventbyId(long id) {
+ return trackedEventsByID.get(id);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85c3fe34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
index 9413a46..054665a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
@@ -103,7 +103,7 @@ public class CommandStatusReportHandler implements
* Wrapper event for Replicate Command.
*/
public static class ReplicationStatus extends CommandStatusEvent {
- ReplicationStatus(CommandStatus cmdStatus) {
+ public ReplicationStatus(CommandStatus cmdStatus) {
super(cmdStatus);
}
}
@@ -112,7 +112,7 @@ public class CommandStatusReportHandler implements
* Wrapper event for CloseContainer Command.
*/
public static class CloseContainerStatus extends CommandStatusEvent {
- CloseContainerStatus(CommandStatus cmdStatus) {
+ public CloseContainerStatus(CommandStatus cmdStatus) {
super(cmdStatus);
}
}
@@ -121,7 +121,7 @@ public class CommandStatusReportHandler implements
* Wrapper event for DeleteBlock Command.
*/
public static class DeleteBlockCommandStatus extends CommandStatusEvent {
- DeleteBlockCommandStatus(CommandStatus cmdStatus) {
+ public DeleteBlockCommandStatus(CommandStatus cmdStatus) {
super(cmdStatus);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85c3fe34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 863907e..b94ce4f 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -23,12 +23,14 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.hadoop.hdds.scm.events.SCMEvents.DATANODE_COMMAND;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ;
/**
* In case of a node failure, volume failure, volume out of spapce, node
@@ -80,6 +82,8 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
new CloseContainerCommand(containerID.getId(),
info.getReplicationType(), info.getPipelineID()));
publisher.fireEvent(DATANODE_COMMAND, closeContainerCommand);
+ publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ, new
+ CloseContainerRetryableReq(containerID));
}
try {
// Finalize event will make sure the state of the container transitions
@@ -107,4 +111,26 @@ public class CloseContainerEventHandler implements EventHandler<ContainerID> {
}
}
+
+ /**
+ * Class to create retryable event. Prevents redundant requests for same
+ * container Id.
+ */
+ public static class CloseContainerRetryableReq implements
+ IdentifiableEventPayload {
+
+ private ContainerID containerID;
+ public CloseContainerRetryableReq(ContainerID containerID) {
+ this.containerID = containerID;
+ }
+
+ public ContainerID getContainerID() {
+ return containerID;
+ }
+
+ @Override
+ public long getId() {
+ return containerID.getId();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85c3fe34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
new file mode 100644
index 0000000..8e277b9
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerWatcher.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * <p>Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
+ .CloseContainerStatus;
+
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventWatcher;
+import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
+ .CloseContainerRetryableReq;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.ozone.lease.LeaseNotFoundException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * This watcher will watch for CLOSE_CONTAINER_STATUS events fired from
+ * CommandStatusReport. If required it will re-trigger CloseContainer command
+ * for DataNodes to CloseContainerEventHandler.
+ */
+public class CloseContainerWatcher extends
+ EventWatcher<CloseContainerRetryableReq, CloseContainerStatus> {
+
+ public static final Logger LOG =
+ LoggerFactory.getLogger(CloseContainerWatcher.class);
+ private final Mapping containerManager;
+
+ public CloseContainerWatcher(Event<CloseContainerRetryableReq> startEvent,
+ Event<CloseContainerStatus> completionEvent,
+ LeaseManager<Long> leaseManager, Mapping containerManager) {
+ super(startEvent, completionEvent, leaseManager);
+ this.containerManager = containerManager;
+ }
+
+ @Override
+ protected void onTimeout(EventPublisher publisher,
+ CloseContainerRetryableReq payload) {
+ // Let CloseContainerEventHandler handle this message.
+ this.resendEventToHandler(payload.getId(), publisher);
+ }
+
+ @Override
+ protected void onFinished(EventPublisher publisher,
+ CloseContainerRetryableReq payload) {
+ LOG.trace("CloseContainerCommand for containerId: {} executed ", payload
+ .getContainerID().getId());
+ }
+
+ @Override
+ protected synchronized void handleCompletion(CloseContainerStatus status,
+ EventPublisher publisher) throws LeaseNotFoundException {
+ // If status is PENDING then return without doing anything.
+ if(status.getCmdStatus().getStatus().equals(Status.PENDING)){
+ return;
+ }
+
+ CloseContainerRetryableReq closeCont = getTrackedEventbyId(status.getId());
+ super.handleCompletion(status, publisher);
+ // If status is FAILED then send a msg to Handler to resend the command.
+ if (status.getCmdStatus().getStatus().equals(Status.FAILED) && closeCont
+ != null) {
+ this.resendEventToHandler(closeCont.getId(), publisher);
+ }
+ }
+
+ private void resendEventToHandler(long containerID, EventPublisher
+ publisher) {
+ try {
+ // Check if container is still open
+ if (containerManager.getContainer(containerID).isContainerOpen()) {
+ publisher.fireEvent(SCMEvents.CLOSE_CONTAINER,
+ ContainerID.valueof(containerID));
+ }
+ } catch (IOException e) {
+ LOG.warn("Error in CloseContainerWatcher while processing event " +
+ "for containerId {} ExceptionMsg: ", containerID, e.getMessage());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85c3fe34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 5911ce2..9a4f887 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
.DeleteBlockCommandStatus;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
.ReplicationStatus;
+import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler.CloseContainerRetryableReq;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.ContainerActionsFromDatanode;
@@ -104,6 +105,16 @@ public final class SCMEvents {
new TypedEvent<>(ContainerID.class, "Close_Container");
/**
+ * A CLOSE_CONTAINER_RETRYABLE_REQ will be triggered by
+ * CloseContainerEventHandler after sending a SCMCommand to DataNode.
+ * CloseContainerWatcher will track this event. Watcher will be responsible
+ * for retrying it in event of failure or timeout.
+ */
+ public static final TypedEvent<CloseContainerRetryableReq>
+ CLOSE_CONTAINER_RETRYABLE_REQ = new TypedEvent<>(
+ CloseContainerRetryableReq.class, "Close_Container_Retryable");
+
+ /**
* This event will be triggered whenever a new datanode is registered with
* SCM.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85c3fe34/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 7e2bc23..061ff78 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -37,6 +37,7 @@ import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
import org.apache.hadoop.hdds.scm.block.PendingDeleteHandler;
import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
+import org.apache.hadoop.hdds.scm.container.CloseContainerWatcher;
import org.apache.hadoop.hdds.scm.container.ContainerActionsHandler;
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
@@ -257,6 +258,13 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
scmContainerManager.getStateManager(), eventQueue,
commandWatcherLeaseManager);
+ // setup CloseContainer watcher
+ CloseContainerWatcher closeContainerWatcher =
+ new CloseContainerWatcher(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+ SCMEvents.CLOSE_CONTAINER_STATUS, commandWatcherLeaseManager,
+ scmContainerManager);
+ closeContainerWatcher.start(eventQueue);
+
scmAdminUsernames = conf.getTrimmedStringCollection(OzoneConfigKeys
.OZONE_ADMINISTRATORS);
scmUsername = UserGroupInformation.getCurrentUser().getUserName();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85c3fe34/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/TestCloseContainerWatcher.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/TestCloseContainerWatcher.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/TestCloseContainerWatcher.java
new file mode 100644
index 0000000..56c3830
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/TestCloseContainerWatcher.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container;
+
+import org.apache.hadoop.hdds.HddsIdFactory;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
+import org.apache.hadoop.hdds.scm.command.CommandStatusReportHandler
+ .CloseContainerStatus;
+import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler
+ .CloseContainerRetryableReq;
+import org.apache.hadoop.hdds.scm.container.CloseContainerWatcher;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerMapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.EventHandler;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.server.events.EventWatcher;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test class for {@link CloseContainerWatcher}.
+ * */
+public class TestCloseContainerWatcher implements EventHandler<ContainerID> {
+
+ private static final Logger LOG = LoggerFactory
+ .getLogger(TestCloseContainerWatcher.class);
+ private static EventWatcher<CloseContainerRetryableReq, CloseContainerStatus>
+ watcher;
+ private static LeaseManager<Long> leaseManager;
+ private static ContainerMapping containerMapping = Mockito
+ .mock(ContainerMapping.class);
+ private static EventQueue queue;
+ @Rule
+ public Timeout timeout = new Timeout(1000*15);
+
+ @After
+ public void stop() {
+ leaseManager.shutdown();
+ queue.close();
+ }
+
+ /*
+ * This test will test watcher for Failure status event.
+ * */
+ @Test
+ public void testWatcherForFailureStatusEvent() throws
+ InterruptedException, IOException {
+ setupWatcher(90000L);
+ long id1 = HddsIdFactory.getLongId();
+ long id2 = HddsIdFactory.getLongId();
+ queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
+ setupMock(id1, id2, true);
+ GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
+ .captureLogs(LOG);
+ GenericTestUtils.LogCapturer watcherLogger = GenericTestUtils.LogCapturer
+ .captureLogs(CloseContainerWatcher.LOG);
+ GenericTestUtils.setLogLevel(CloseContainerWatcher.LOG, Level.TRACE);
+ testLogger.clearOutput();
+ watcherLogger.clearOutput();
+
+ CommandStatus cmdStatus1 = CommandStatus.newBuilder()
+ .setCmdId(id1)
+ .setStatus(CommandStatus.Status.FAILED)
+ .setType(Type.closeContainerCommand).build();
+ CommandStatus cmdStatus2 = CommandStatus.newBuilder()
+ .setCmdId(id2)
+ .setStatus(CommandStatus.Status.FAILED)
+ .setType(Type.closeContainerCommand).build();
+
+ // File events to watcher
+ queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+ new CloseContainerRetryableReq(ContainerID.valueof(id1)));
+ queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+ new CloseContainerRetryableReq(ContainerID.valueof(id2)));
+ Thread.sleep(10L);
+ queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
+ CloseContainerStatus(cmdStatus1));
+ queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
+ CloseContainerStatus(cmdStatus2));
+
+ Thread.sleep(1000*4L);
+ // validation
+ assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand for " +
+ "containerId: " + id1 + " executed"));
+ assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand for " +
+ "containerId: " + id2 + " executed"));
+ assertTrue(
+ testLogger.getOutput().contains("Handling closeContainerEvent " +
+ "for containerId: id=" + id1));
+ assertTrue(testLogger.getOutput().contains("Handling closeContainerEvent " +
+ "for containerId: id=" + id2));
+
+ }
+
+ @Test
+ public void testWatcherForPendingStatusEvent() throws
+ InterruptedException, IOException {
+ setupWatcher(90000L);
+ long id1 = HddsIdFactory.getLongId();
+ long id2 = HddsIdFactory.getLongId();
+ queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
+ setupMock(id1, id2, true);
+ GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
+ .captureLogs(LOG);
+ GenericTestUtils.LogCapturer watcherLogger = GenericTestUtils.LogCapturer
+ .captureLogs(CloseContainerWatcher.LOG);
+ GenericTestUtils.setLogLevel(CloseContainerWatcher.LOG, Level.TRACE);
+ testLogger.clearOutput();
+ watcherLogger.clearOutput();
+
+ CommandStatus cmdStatus1 = CommandStatus.newBuilder()
+ .setCmdId(id1)
+ .setStatus(CommandStatus.Status.PENDING)
+ .setType(Type.closeContainerCommand).build();
+ CommandStatus cmdStatus2 = CommandStatus.newBuilder()
+ .setCmdId(id2)
+ .setStatus(CommandStatus.Status.PENDING)
+ .setType(Type.closeContainerCommand).build();
+
+ // File events to watcher
+ queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+ new CloseContainerRetryableReq(ContainerID.valueof(id1)));
+ queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+ new CloseContainerRetryableReq(ContainerID.valueof(id2)));
+ Thread.sleep(10L);
+ queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
+ CloseContainerStatus(cmdStatus1));
+ queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS, new
+ CloseContainerStatus(cmdStatus2));
+
+ Thread.sleep(1000*2L);
+ // validation
+ assertFalse(watcherLogger.getOutput().contains("CloseContainerCommand "
+ + "for containerId: " + id1 + " executed"));
+ assertFalse(watcherLogger.getOutput().contains("CloseContainerCommand "
+ + "for containerId: " + id2 + " executed"));
+ assertFalse(testLogger.getOutput().contains("Handling "
+ + "closeContainerEvent for containerId: id=" + id1));
+ assertFalse(testLogger.getOutput().contains("Handling "
+ + "closeContainerEvent for containerId: id=" + id2));
+
+ }
+
+ @Test
+ public void testWatcherForExecutedStatusEvent()
+ throws IOException, InterruptedException {
+ setupWatcher(90000L);
+ long id1 = HddsIdFactory.getLongId();
+ long id2 = HddsIdFactory.getLongId();
+ queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
+ setupMock(id1, id2, true);
+ GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
+ .captureLogs(LOG);
+ GenericTestUtils.LogCapturer watcherLogger = GenericTestUtils.LogCapturer
+ .captureLogs(CloseContainerWatcher.LOG);
+ GenericTestUtils.setLogLevel(CloseContainerWatcher.LOG, Level.TRACE);
+ testLogger.clearOutput();
+ watcherLogger.clearOutput();
+
+ // When both of the pending event are executed successfully by DataNode
+ CommandStatus cmdStatus1 = CommandStatus.newBuilder()
+ .setCmdId(id1)
+ .setStatus(CommandStatus.Status.EXECUTED)
+ .setType(Type.closeContainerCommand).build();
+ CommandStatus cmdStatus2 = CommandStatus.newBuilder()
+ .setCmdId(id2)
+ .setStatus(CommandStatus.Status.EXECUTED)
+ .setType(Type.closeContainerCommand).build();
+ // File events to watcher
+ testLogger.clearOutput();
+ watcherLogger.clearOutput();
+ queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+ new CloseContainerRetryableReq(ContainerID.valueof(id1)));
+ queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+ new CloseContainerRetryableReq(ContainerID.valueof(id2)));
+ Thread.sleep(10L);
+ queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS,
+ new CloseContainerStatus(cmdStatus1));
+ queue.fireEvent(SCMEvents.CLOSE_CONTAINER_STATUS,
+ new CloseContainerStatus(cmdStatus2));
+
+ Thread.sleep(1000*3L);
+ // validation
+ assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand "
+ + "for containerId: " + id1 + " executed"));
+ assertTrue(watcherLogger.getOutput().contains("CloseContainerCommand "
+ + "for containerId: " + id2 + " executed"));
+ assertFalse(testLogger.getOutput().contains("Handling "
+ + "closeContainerEvent for containerId: id=" + id1));
+ assertFalse(testLogger.getOutput().contains("Handling "
+ + "closeContainerEvent for containerId: id=" + id2));
+ }
+
+ private void setupWatcher(long time) {
+ leaseManager = new LeaseManager<>("TestCloseContainerWatcher#LeaseManager",
+ time);
+ leaseManager.start();
+ watcher = new CloseContainerWatcher(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+ SCMEvents.CLOSE_CONTAINER_STATUS, leaseManager, containerMapping);
+ queue = new EventQueue();
+ watcher.start(queue);
+ }
+
+ /*
+ * This test will fire two retryable closeContainer events. Both will timeout.
+ * First event container will be open at time of handling so it should be
+ * sent back to appropriate handler. Second event container will be closed,
+ * so it should not be retried.
+ * */
+ @Test
+ public void testWatcherRetryableTimeoutHandling() throws InterruptedException,
+ IOException {
+
+ long id1 = HddsIdFactory.getLongId();
+ long id2 = HddsIdFactory.getLongId();
+ setupWatcher(1000L);
+ queue.addHandler(SCMEvents.CLOSE_CONTAINER, this);
+ setupMock(id1, id2, false);
+ GenericTestUtils.LogCapturer testLogger = GenericTestUtils.LogCapturer
+ .captureLogs(LOG);
+ testLogger.clearOutput();
+
+ // File events to watcher
+ queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+ new CloseContainerRetryableReq(ContainerID.valueof(id1)));
+ queue.fireEvent(SCMEvents.CLOSE_CONTAINER_RETRYABLE_REQ,
+ new CloseContainerRetryableReq(ContainerID.valueof(id2)));
+
+ Thread.sleep(1000L + 10);
+
+ // validation
+ assertTrue(testLogger.getOutput().contains("Handling "
+ + "closeContainerEvent for containerId: id=" + id1));
+ assertFalse(testLogger.getOutput().contains("Handling "
+ + "closeContainerEvent for containerId: id=" + id2));
+ }
+
+
+ private void setupMock(long id1, long id2, boolean isOpen)
+ throws IOException {
+ ContainerInfo containerInfo = Mockito.mock(ContainerInfo.class);
+ ContainerInfo containerInfo2 = Mockito.mock(ContainerInfo.class);
+ when(containerMapping.getContainer(id1)).thenReturn(containerInfo);
+ when(containerMapping.getContainer(id2)).thenReturn(containerInfo2);
+ when(containerInfo.isContainerOpen()).thenReturn(true);
+ when(containerInfo2.isContainerOpen()).thenReturn(isOpen);
+ }
+
+ @Override
+ public void onMessage(ContainerID containerID, EventPublisher publisher) {
+ LOG.info("Handling closeContainerEvent for containerId: {}", containerID);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org