You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by mi...@apache.org on 2015/07/10 17:31:22 UTC
hadoop git commit: YARN-3445. Cache runningApps in RMNode for getting
running apps on given NodeId. (Junping Du via mingma)
Repository: hadoop
Updated Branches:
refs/heads/trunk b48908033 -> 08244264c
YARN-3445. Cache runningApps in RMNode for getting running apps on given NodeId. (Junping Du via mingma)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/08244264
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/08244264
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/08244264
Branch: refs/heads/trunk
Commit: 08244264c0583472b9c4e16591cfde72c6db62a2
Parents: b489080
Author: Ming Ma <mi...@apache.org>
Authored: Fri Jul 10 08:30:10 2015 -0700
Committer: Ming Ma <mi...@apache.org>
Committed: Fri Jul 10 08:30:10 2015 -0700
----------------------------------------------------------------------
.../hadoop/yarn/sls/nodemanager/NodeInfo.java | 8 +++-
.../yarn/sls/scheduler/RMNodeWrapper.java | 5 +++
hadoop-yarn-project/CHANGES.txt | 3 ++
.../server/resourcemanager/rmnode/RMNode.java | 2 +
.../resourcemanager/rmnode/RMNodeImpl.java | 43 ++++++++++++++++----
.../yarn/server/resourcemanager/MockNodes.java | 5 +++
.../resourcemanager/TestRMNodeTransitions.java | 36 ++++++++++++++--
7 files changed, 91 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index ee6eb7b..440779c 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -62,7 +62,8 @@ public class NodeInfo {
private NodeState state;
private List<ContainerId> toCleanUpContainers;
private List<ApplicationId> toCleanUpApplications;
-
+ private List<ApplicationId> runningApplications;
+
public FakeRMNodeImpl(NodeId nodeId, String nodeAddr, String httpAddress,
Resource perNode, String rackName, String healthReport,
int cmdPort, String hostName, NodeState state) {
@@ -77,6 +78,7 @@ public class NodeInfo {
this.state = state;
toCleanUpApplications = new ArrayList<ApplicationId>();
toCleanUpContainers = new ArrayList<ContainerId>();
+ runningApplications = new ArrayList<ApplicationId>();
}
public NodeId getNodeID() {
@@ -135,6 +137,10 @@ public class NodeInfo {
return toCleanUpApplications;
}
+ public List<ApplicationId> getRunningApps() {
+ return runningApplications;
+ }
+
public void updateNodeHeartbeatResponseForCleanup(
NodeHeartbeatResponse response) {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index b64be1b..a6633ae 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -119,6 +119,11 @@ public class RMNodeWrapper implements RMNode {
}
@Override
+ public List<ApplicationId> getRunningApps() {
+ return node.getRunningApps();
+ }
+
+ @Override
public void updateNodeHeartbeatResponseForCleanup(
NodeHeartbeatResponse nodeHeartbeatResponse) {
node.updateNodeHeartbeatResponseForCleanup(nodeHeartbeatResponse);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 2a9ff98..db000d7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -1678,6 +1678,9 @@ Release 2.6.0 - 2014-11-18
YARN-2811. In Fair Scheduler, reservation fulfillments shouldn't ignore max
share (Siqi Li via Sandy Ryza)
+ YARN-3445. Cache runningApps in RMNode for getting running apps on given
+ NodeId. (Junping Du via mingma)
+
IMPROVEMENTS
YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index 95eeaf6..0386be6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -119,6 +119,8 @@ public interface RMNode {
public List<ApplicationId> getAppsToCleanup();
+ List<ApplicationId> getRunningApps();
+
/**
* Update a {@link NodeHeartbeatResponse} with the list of containers and
* applications to clean up for this node.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index d1e6190..9bc91c7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -123,11 +123,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
new HashSet<ContainerId>();
/* the list of applications that have finished and need to be purged */
- private final List<ApplicationId> finishedApplications = new ArrayList<ApplicationId>();
+ private final List<ApplicationId> finishedApplications =
+ new ArrayList<ApplicationId>();
+
+ /* the list of applications that are running on this node */
+ private final List<ApplicationId> runningApplications =
+ new ArrayList<ApplicationId>();
private NodeHeartbeatResponse latestNodeHeartBeatResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
-
+
private static final StateMachineFactory<RMNodeImpl,
NodeState,
RMNodeEventType,
@@ -136,7 +141,7 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
NodeState,
RMNodeEventType,
RMNodeEvent>(NodeState.NEW)
-
+
//Transitions from NEW state
.addTransition(NodeState.NEW, NodeState.RUNNING,
RMNodeEventType.STARTED, new AddNodeTransition())
@@ -383,6 +388,16 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
}
@Override
+ public List<ApplicationId> getRunningApps() {
+ this.readLock.lock();
+ try {
+ return new ArrayList<ApplicationId>(this.runningApplications);
+ } finally {
+ this.readLock.unlock();
+ }
+ }
+
+ @Override
public List<ContainerId> getContainersToCleanUp() {
this.readLock.lock();
@@ -519,9 +534,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
LOG.warn("Cannot get RMApp by appId=" + appId
+ ", just added it to finishedApplications list for cleanup");
rmNode.finishedApplications.add(appId);
+ rmNode.runningApplications.remove(appId);
return;
}
+ // Add running applications back due to Node add or Node reconnection.
+ rmNode.runningApplications.add(appId);
context.getDispatcher().getEventHandler()
.handle(new RMAppRunningOnNodeEvent(appId, nodeId));
}
@@ -707,8 +725,9 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
@Override
public void transition(RMNodeImpl rmNode, RMNodeEvent event) {
- rmNode.finishedApplications.add(((
- RMNodeCleanAppEvent) event).getAppId());
+ ApplicationId appId = ((RMNodeCleanAppEvent) event).getAppId();
+ rmNode.finishedApplications.add(appId);
+ rmNode.runningApplications.remove(appId);
}
}
@@ -910,12 +929,22 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
+ "cleanup, no further processing");
continue;
}
- if (finishedApplications.contains(containerId.getApplicationAttemptId()
- .getApplicationId())) {
+
+ ApplicationId containerAppId =
+ containerId.getApplicationAttemptId().getApplicationId();
+
+ if (finishedApplications.contains(containerAppId)) {
LOG.info("Container " + containerId
+ " belongs to an application that is already killed,"
+ " no further processing");
continue;
+ } else if (!runningApplications.contains(containerAppId)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Container " + containerId
+ + " is the first container get launched for application "
+ + containerAppId);
+ }
+ runningApplications.add(containerAppId);
}
// Process running containers
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
index 2d863d1..095fe28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNodes.java
@@ -187,6 +187,11 @@ public class MockNodes {
}
@Override
+ public List<ApplicationId> getRunningApps() {
+ return null;
+ }
+
+ @Override
public void updateNodeHeartbeatResponseForCleanup(NodeHeartbeatResponse response) {
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/08244264/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
index 01f4357..ece896b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
@@ -33,6 +33,7 @@ import java.util.List;
import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeState;
@@ -485,9 +486,9 @@ public class TestRMNodeTransitions {
NodeId nodeId = node.getNodeID();
// Expire a container
- ContainerId completedContainerId = BuilderUtils.newContainerId(
- BuilderUtils.newApplicationAttemptId(
- BuilderUtils.newApplicationId(0, 0), 0), 0);
+ ContainerId completedContainerId = BuilderUtils.newContainerId(
+ BuilderUtils.newApplicationAttemptId(
+ BuilderUtils.newApplicationId(0, 0), 0), 0);
node.handle(new RMNodeCleanContainerEvent(nodeId, completedContainerId));
Assert.assertEquals(1, node.getContainersToCleanUp().size());
@@ -512,6 +513,35 @@ public class TestRMNodeTransitions {
Assert.assertEquals(finishedAppId, hbrsp.getApplicationsToCleanup().get(0));
}
+ @Test(timeout=20000)
+ public void testUpdateHeartbeatResponseForAppLifeCycle() {
+ RMNodeImpl node = getRunningNode();
+ NodeId nodeId = node.getNodeID();
+
+ ApplicationId runningAppId = BuilderUtils.newApplicationId(0, 1);
+ // Create a running container
+ ContainerId runningContainerId = BuilderUtils.newContainerId(
+ BuilderUtils.newApplicationAttemptId(
+ runningAppId, 0), 0);
+
+ ContainerStatus status = ContainerStatus.newInstance(runningContainerId,
+ ContainerState.RUNNING, "", 0);
+ List<ContainerStatus> statusList = new ArrayList<ContainerStatus>();
+ statusList.add(status);
+ NodeHealthStatus nodeHealth = NodeHealthStatus.newInstance(true,
+ "", System.currentTimeMillis());
+ node.handle(new RMNodeStatusEvent(nodeId, nodeHealth,
+ statusList, null, null));
+
+ Assert.assertEquals(1, node.getRunningApps().size());
+
+ // Finish an application
+ ApplicationId finishedAppId = runningAppId;
+ node.handle(new RMNodeCleanAppEvent(nodeId, finishedAppId));
+ Assert.assertEquals(1, node.getAppsToCleanup().size());
+ Assert.assertEquals(0, node.getRunningApps().size());
+ }
+
private RMNodeImpl getRunningNode() {
return getRunningNode(null, 0);
}