You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2020/10/28 17:47:27 UTC
[hadoop] branch branch-3.3 updated: YARN-10467. ContainerIdPBImpl
objects can be leaked in RMNodeImpl.completedContainers. Contributed by
Haibo Chen
This is an automated email from the ASF dual-hosted git repository.
jhung pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new f95c082 YARN-10467. ContainerIdPBImpl objects can be leaked in RMNodeImpl.completedContainers. Contributed by Haibo Chen
f95c082 is described below
commit f95c0824b01175590fe98e2fba1e5988694a52da
Author: Jonathan Hung <jh...@linkedin.com>
AuthorDate: Wed Oct 28 10:32:47 2020 -0700
YARN-10467. ContainerIdPBImpl objects can be leaked in RMNodeImpl.completedContainers. Contributed by Haibo Chen
(cherry picked from commit bab5bf9743f54f48cc2f31b4e5c8b6d4e5a5cfb8)
---
.../rmapp/attempt/RMAppAttemptImpl.java | 28 +++--
.../rmapp/attempt/TestRMAppAttemptTransitions.java | 115 +++++++++++++++++++++
2 files changed, 137 insertions(+), 6 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 6087e58..34d22c6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -179,6 +179,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private long launchAMEndTime = 0;
private long scheduledTime = 0;
private long containerAllocatedTime = 0;
+ private boolean nonWorkPreservingAMContainerFinished = false;
// Set to null initially. Will eventually get set
// if an RMAppAttemptUnregistrationEvent occurs
@@ -853,7 +854,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
// A new allocate means the AM received the previously sent
// finishedContainers. We can ack this to NM now
- sendFinishedContainersToNM();
+ sendFinishedContainersToNM(finishedContainersSentToAM);
// Mark every containerStatus as being sent to AM though we may return
// only the ones that belong to the current attempt
@@ -1980,12 +1981,13 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
}
// Ack NM to remove finished containers from context.
- private void sendFinishedContainersToNM() {
- for (NodeId nodeId : finishedContainersSentToAM.keySet()) {
+ private void sendFinishedContainersToNM(
+ Map<NodeId, List<ContainerStatus>> finishedContainers) {
+ for (NodeId nodeId : finishedContainers.keySet()) {
// Clear and get current values
List<ContainerStatus> currentSentContainers =
- finishedContainersSentToAM.put(nodeId, new ArrayList<>());
+ finishedContainers.put(nodeId, new ArrayList<>());
List<ContainerId> containerIdList =
new ArrayList<>(currentSentContainers.size());
for (ContainerStatus containerStatus : currentSentContainers) {
@@ -1994,7 +1996,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
eventHandler.handle(new RMNodeFinishedContainersPulledByAMEvent(nodeId,
containerIdList));
}
- this.finishedContainersSentToAM.clear();
+ finishedContainers.clear();
}
// Add am container to the list so that am container instance will be
@@ -2020,7 +2022,16 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
appAttempt.finishedContainersSentToAM.putIfAbsent(nodeId,
new ArrayList<>());
appAttempt.finishedContainersSentToAM.get(nodeId).add(containerStatus);
- appAttempt.sendFinishedContainersToNM();
+ appAttempt.sendFinishedContainersToNM(
+ appAttempt.finishedContainersSentToAM);
+ // there might be some completed containers that have not been pulled
+ // by the AM heartbeat, explicitly add them for cleanup.
+ appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers);
+
+ // mark the fact that AM container has finished so that future finished
+ // containers will be cleaned up without the engagement of AM containers
+ // (through heartbeat)
+ appAttempt.nonWorkPreservingAMContainerFinished = true;
} else {
appAttempt.sendFinishedAMContainerToNM(nodeId,
containerStatus.getContainerId());
@@ -2048,6 +2059,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
.getNodeId(), new ArrayList<>());
appAttempt.justFinishedContainers.get(containerFinishedEvent
.getNodeId()).add(containerFinishedEvent.getContainerStatus());
+
+ if (appAttempt.nonWorkPreservingAMContainerFinished) {
+ // AM container has finished, so no more AM heartbeats to do the cleanup.
+ appAttempt.sendFinishedContainersToNM(appAttempt.justFinishedContainers);
+ }
}
private static final class ContainerFinishedAtFinalStateTransition
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
index e2f80ca..4e5ff3f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
@@ -643,6 +643,8 @@ public class TestRMAppAttemptTransitions {
RMContainer rmContainer = mock(RMContainerImpl.class);
when(scheduler.getRMContainer(container.getId())).
thenReturn(rmContainer);
+ when(container.getNodeId()).thenReturn(
+ BuilderUtils.newNodeId("localhost", 0));
applicationAttempt.handle(
new RMAppAttemptEvent(applicationAttempt.getAppAttemptId(),
@@ -1530,6 +1532,119 @@ public class TestRMAppAttemptTransitions {
.handle(Mockito.any(RMNodeEvent.class));
}
+ /**
+ * Check a completed container that is not yet pulled by AM heartbeat,
+ * is ACKed to NM for cleanup when the AM container exits.
+ */
+ @Test
+ public void testFinishedContainerNotBeingPulledByAMHeartbeat() {
+ Container amContainer = allocateApplicationAttempt();
+ launchApplicationAttempt(amContainer);
+ runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
+
+ application.handle(new RMAppRunningOnNodeEvent(application
+ .getApplicationId(), amContainer.getNodeId()));
+
+ // Complete a non-AM container
+ ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
+ .getAppAttemptId(), 2);
+ Container container1 = mock(Container.class);
+ ContainerStatus containerStatus1 = mock(ContainerStatus.class);
+ when(container1.getId()).thenReturn(
+ containerId1);
+ when(containerStatus1.getContainerId()).thenReturn(containerId1);
+ when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
+ applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+ applicationAttempt.getAppAttemptId(), containerStatus1,
+ container1.getNodeId()));
+
+ // Verify justFinishedContainers
+ ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
+ ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
+ Assert.assertEquals(1, applicationAttempt.getJustFinishedContainers()
+ .size());
+ Assert.assertEquals(container1.getId(), applicationAttempt
+ .getJustFinishedContainers().get(0).getContainerId());
+ Assert.assertTrue(
+ getFinishedContainersSentToAM(applicationAttempt).isEmpty());
+
+ // finish AM container to emulate AM exit event
+ containerStatus1 = mock(ContainerStatus.class);
+ ContainerId amContainerId = amContainer.getId();
+ when(containerStatus1.getContainerId()).thenReturn(amContainerId);
+ applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+ applicationAttempt.getAppAttemptId(), containerStatus1,
+ amContainer.getNodeId()));
+
+ Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture());
+ List<RMNodeFinishedContainersPulledByAMEvent> containerPulledEvents =
+ captor.getAllValues();
+ // Verify AM container is acked to NM via the RMNodeEvent immediately
+ Assert.assertEquals(amContainer.getId(),
+ containerPulledEvents.get(0).getContainers().get(0));
+ // Verify the non-AM container is acked to NM via the RMNodeEvent
+ Assert.assertEquals(container1.getId(),
+ containerPulledEvents.get(1).getContainers().get(0));
+ Assert.assertTrue("No container shall be added to justFinishedContainers" +
+ " as soon as AM container exits",
+ applicationAttempt.getJustFinishedContainers().isEmpty());
+ Assert.assertTrue(
+ getFinishedContainersSentToAM(applicationAttempt).isEmpty());
+ }
+
+ /**
+ * Check a completed container is ACKed to NM for cleanup after the AM
+ * container has exited.
+ */
+ @Test
+ public void testFinishedContainerAfterAMExit() {
+ Container amContainer = allocateApplicationAttempt();
+ launchApplicationAttempt(amContainer);
+ runApplicationAttempt(amContainer, "host", 8042, "oldtrackingurl", false);
+
+ // finish AM container to emulate AM exit event
+ ContainerStatus containerStatus1 = mock(ContainerStatus.class);
+ ContainerId amContainerId = amContainer.getId();
+ when(containerStatus1.getContainerId()).thenReturn(amContainerId);
+ application.handle(new RMAppRunningOnNodeEvent(application
+ .getApplicationId(),
+ amContainer.getNodeId()));
+ applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+ applicationAttempt.getAppAttemptId(), containerStatus1,
+ amContainer.getNodeId()));
+
+ // Verify AM container is acked to NM via the RMNodeEvent immediately
+ ArgumentCaptor<RMNodeFinishedContainersPulledByAMEvent> captor =
+ ArgumentCaptor.forClass(RMNodeFinishedContainersPulledByAMEvent.class);
+ Mockito.verify(rmnodeEventHandler).handle(captor.capture());
+ Assert.assertEquals(amContainer.getId(),
+ captor.getValue().getContainers().get(0));
+
+ // Complete a non-AM container
+ ContainerId containerId1 = BuilderUtils.newContainerId(applicationAttempt
+ .getAppAttemptId(), 2);
+ Container container1 = mock(Container.class);
+ containerStatus1 = mock(ContainerStatus.class);
+ when(container1.getId()).thenReturn(containerId1);
+ when(containerStatus1.getContainerId()).thenReturn(containerId1);
+ when(container1.getNodeId()).thenReturn(NodeId.newInstance("host", 1234));
+ applicationAttempt.handle(new RMAppAttemptContainerFinishedEvent(
+ applicationAttempt.getAppAttemptId(), containerStatus1,
+ container1.getNodeId()));
+
+ // Verify container is acked to NM via the RMNodeEvent immediately
+ captor = ArgumentCaptor.forClass(
+ RMNodeFinishedContainersPulledByAMEvent.class);
+ Mockito.verify(rmnodeEventHandler, times(2)).handle(captor.capture());
+ Assert.assertEquals(container1.getId(),
+ captor.getAllValues().get(1).getContainers().get(0));
+ Assert.assertTrue("No container shall be added to justFinishedContainers" +
+ " after AM container exited",
+ applicationAttempt.getJustFinishedContainers().isEmpty());
+ Assert.assertTrue(
+ getFinishedContainersSentToAM(applicationAttempt).isEmpty());
+ }
+
private static List<ContainerStatus> getFinishedContainersSentToAM(
RMAppAttempt applicationAttempt) {
List<ContainerStatus> containers = new ArrayList<ContainerStatus>();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org