You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by aw...@apache.org on 2015/07/31 23:53:43 UTC
[03/10] hadoop git commit: YARN-433. When RM is catching up with node
updates then it should not expire acquired containers. Contributed by Xuan
Gong
YARN-433. When RM is catching up with node updates then it should not expire acquired containers. Contributed by Xuan Gong
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ab80e277
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ab80e277
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ab80e277
Branch: refs/heads/HADOOP-12111
Commit: ab80e277039a586f6d6259b2511ac413e29ea4f8
Parents: 2087eaf
Author: Zhihai Xu <zx...@apache.org>
Authored: Thu Jul 30 21:56:25 2015 -0700
Committer: Zhihai Xu <zx...@apache.org>
Committed: Thu Jul 30 21:57:11 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../rmcontainer/RMContainerImpl.java | 27 +--------
.../resourcemanager/rmnode/RMNodeImpl.java | 8 +++
.../resourcemanager/TestRMNodeTransitions.java | 62 ++++++++++++++++----
4 files changed, 63 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab80e277/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index f027c29..1da2dbc 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -716,6 +716,9 @@ Release 2.8.0 - UNRELEASED
YARN-3971. Skip RMNodeLabelsManager#checkRemoveFromClusterNodeLabelsOfQueue
on nodelabel recovery. (Bibin A Chundatt via wangda)
+ YARN-433. When RM is catching up with node updates then it should not expire
+ acquired containers. (Xuan Gong via zxu)
+
Release 2.7.2 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab80e277/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index f7d3f56..940f76f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -99,9 +99,9 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
// Transitions from ACQUIRED state
.addTransition(RMContainerState.ACQUIRED, RMContainerState.RUNNING,
- RMContainerEventType.LAUNCHED, new LaunchedTransition())
+ RMContainerEventType.LAUNCHED)
.addTransition(RMContainerState.ACQUIRED, RMContainerState.COMPLETED,
- RMContainerEventType.FINISHED, new ContainerFinishedAtAcquiredState())
+ RMContainerEventType.FINISHED, new FinishedTransition())
.addTransition(RMContainerState.ACQUIRED, RMContainerState.RELEASED,
RMContainerEventType.RELEASED, new KillTransition())
.addTransition(RMContainerState.ACQUIRED, RMContainerState.EXPIRED,
@@ -486,16 +486,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
}
}
- private static final class LaunchedTransition extends BaseTransition {
-
- @Override
- public void transition(RMContainerImpl container, RMContainerEvent event) {
- // Unregister from containerAllocationExpirer.
- container.containerAllocationExpirer.unregister(container
- .getContainerId());
- }
- }
-
private static final class ContainerRescheduledTransition extends
FinishedTransition {
@@ -554,19 +544,6 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
}
}
- private static final class ContainerFinishedAtAcquiredState extends
- FinishedTransition {
- @Override
- public void transition(RMContainerImpl container, RMContainerEvent event) {
- // Unregister from containerAllocationExpirer.
- container.containerAllocationExpirer.unregister(container
- .getContainerId());
-
- // Inform AppAttempt
- super.transition(container, event);
- }
- }
-
private static final class KillTransition extends FinishedTransition {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab80e277/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 09b9278..f182d02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -59,6 +59,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
@@ -107,6 +108,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
private long lastHealthReportTime;
private String nodeManagerVersion;
+ private final ContainerAllocationExpirer containerAllocationExpirer;
/* set of containers that have just launched */
private final Set<ContainerId> launchedContainers =
new HashSet<ContainerId>();
@@ -265,6 +267,8 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
this.stateMachine = stateMachineFactory.make(this);
this.nodeUpdateQueue = new ConcurrentLinkedQueue<UpdatedContainerInfo>();
+
+ this.containerAllocationExpirer = context.getContainerAllocationExpirer();
}
@Override
@@ -953,11 +957,15 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
// Just launched container. RM knows about it the first time.
launchedContainers.add(containerId);
newlyLaunchedContainers.add(remoteContainer);
+ // Unregister from containerAllocationExpirer.
+ containerAllocationExpirer.unregister(containerId);
}
} else {
// A finished container
launchedContainers.remove(containerId);
completedContainers.add(remoteContainer);
+ // Unregister from containerAllocationExpirer.
+ containerAllocationExpirer.unregister(containerId);
}
}
if (newlyLaunchedContainers.size() != 0 || completedContainers.size() != 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/ab80e277/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index ece896b..4964c59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -31,6 +31,7 @@ import java.util.Collections;
import java.util.List;
import org.apache.hadoop.util.HostsFileReader;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -43,6 +44,7 @@ import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.event.InlineDispatcher;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
@@ -105,8 +107,9 @@ public class TestRMNodeTransitions {
InlineDispatcher rmDispatcher = new InlineDispatcher();
rmContext =
- new RMContextImpl(rmDispatcher, null, null, null,
- mock(DelegationTokenRenewer.class), null, null, null, null, null);
+ new RMContextImpl(rmDispatcher, mock(ContainerAllocationExpirer.class),
+ null, null, mock(DelegationTokenRenewer.class), null, null, null,
+ null, null);
NodesListManager nodesListManager = mock(NodesListManager.class);
HostsFileReader reader = mock(HostsFileReader.class);
when(nodesListManager.getHostsReader()).thenReturn(reader);
@@ -147,7 +150,8 @@ public class TestRMNodeTransitions {
public void tearDown() throws Exception {
}
- private RMNodeStatusEvent getMockRMNodeStatusEvent() {
+ private RMNodeStatusEvent getMockRMNodeStatusEvent(
+ List<ContainerStatus> containerStatus) {
NodeHeartbeatResponse response = mock(NodeHeartbeatResponse.class);
NodeHealthStatus healthStatus = mock(NodeHealthStatus.class);
@@ -158,6 +162,9 @@ public class TestRMNodeTransitions {
doReturn(healthStatus).when(event).getNodeHealthStatus();
doReturn(response).when(event).getLatestResponse();
doReturn(RMNodeEventType.STATUS_UPDATE).when(event).getType();
+ if (containerStatus != null) {
+ doReturn(containerStatus).when(event).getContainers();
+ }
return event;
}
@@ -176,7 +183,7 @@ public class TestRMNodeTransitions {
// Now verify that scheduler isn't notified of an expired container
// by checking number of 'completedContainers' it got in the previous event
- RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
+ RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(null);
ContainerStatus containerStatus = mock(ContainerStatus.class);
doReturn(completedContainerId).when(containerStatus).getContainerId();
doReturn(Collections.singletonList(containerStatus)).
@@ -207,11 +214,11 @@ public class TestRMNodeTransitions {
ContainerId completedContainerIdFromNode2_2 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(1, 1), 1), 2);
-
- RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent();
- RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent();
- RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent();
-
+
+ RMNodeStatusEvent statusEventFromNode1 = getMockRMNodeStatusEvent(null);
+ RMNodeStatusEvent statusEventFromNode2_1 = getMockRMNodeStatusEvent(null);
+ RMNodeStatusEvent statusEventFromNode2_2 = getMockRMNodeStatusEvent(null);
+
ContainerStatus containerStatusFromNode1 = mock(ContainerStatus.class);
ContainerStatus containerStatusFromNode2_1 = mock(ContainerStatus.class);
ContainerStatus containerStatusFromNode2_2 = mock(ContainerStatus.class);
@@ -263,8 +270,8 @@ public class TestRMNodeTransitions {
BuilderUtils.newApplicationAttemptId(
BuilderUtils.newApplicationId(1, 1), 1), 1);
- RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent();
- RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent();
+ RMNodeStatusEvent statusEvent1 = getMockRMNodeStatusEvent(null);
+ RMNodeStatusEvent statusEvent2 = getMockRMNodeStatusEvent(null);
ContainerStatus containerStatus1 = mock(ContainerStatus.class);
ContainerStatus containerStatus2 = mock(ContainerStatus.class);
@@ -499,7 +506,7 @@ public class TestRMNodeTransitions {
// Verify status update does not clear containers/apps to cleanup
// but updating heartbeat response for cleanup does
- RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent();
+ RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(null);
node.handle(statusEvent);
Assert.assertEquals(1, node.getContainersToCleanUp().size());
Assert.assertEquals(1, node.getAppsToCleanup().size());
@@ -706,4 +713,35 @@ public class TestRMNodeTransitions {
null, null));
Assert.assertEquals(nmVersion2, node.getNodeManagerVersion());
}
+
+ @Test
+ public void testContainerExpire() throws Exception {
+ ContainerAllocationExpirer mockExpirer =
+ mock(ContainerAllocationExpirer.class);
+ ApplicationId appId =
+ ApplicationId.newInstance(System.currentTimeMillis(), 1);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1L);
+ ContainerId containerId2 = ContainerId.newContainerId(appAttemptId, 2L);
+ mockExpirer.register(containerId1);
+ mockExpirer.register(containerId2);
+ verify(mockExpirer).register(containerId1);
+ verify(mockExpirer).register(containerId2);
+ ((RMContextImpl) rmContext).setContainerAllocationExpirer(mockExpirer);
+ RMNodeImpl rmNode = getRunningNode();
+ ContainerStatus status1 =
+ ContainerStatus
+ .newInstance(containerId1, ContainerState.RUNNING, "", 0);
+ ContainerStatus status2 =
+ ContainerStatus.newInstance(containerId2, ContainerState.COMPLETE, "",
+ 0);
+ List<ContainerStatus> statusList = new ArrayList<ContainerStatus>();
+ statusList.add(status1);
+ statusList.add(status2);
+ RMNodeStatusEvent statusEvent = getMockRMNodeStatusEvent(statusList);
+ rmNode.handle(statusEvent);
+ verify(mockExpirer).unregister(containerId1);
+ verify(mockExpirer).unregister(containerId2);
+ }
}