You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ji...@apache.org on 2014/06/17 01:56:13 UTC
svn commit: r1603028 [1/2] - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/serve...
Author: jianhe
Date: Mon Jun 16 23:56:12 2014
New Revision: 1603028
URL: http://svn.apache.org/r1603028
Log:
YARN-1885. Fixed a bug that RM may not send application-clean-up signal to NMs where the completed applications previously ran in case of RM restart. Contributed by Wangda Tan
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRunningOnNodeEvent.java
Removed:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/event/RMAppAttemptContainerAcquiredEvent.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Mon Jun 16 23:56:12 2014
@@ -248,6 +248,10 @@ Release 2.5.0 - UNRELEASED
YARN-2155. FairScheduler: Incorrect threshold check for preemption.
(Wei Yan via kasha)
+ YARN-1885. Fixed a bug that RM may not send application-clean-up signal
+ to NMs where the completed applications previously ran in case of RM restart.
+ (Wangda Tan via jianhe)
+
Release 2.4.1 - 2014-06-23
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java Mon Jun 16 23:56:12 2014
@@ -60,7 +60,7 @@ public class TestResourceTrackerOnHA ext
// make sure registerNodeManager works when failover happens
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, 0, resource,
- YarnVersionInfo.getVersion(), null);
+ YarnVersionInfo.getVersion(), null, null);
resourceTracker.registerNodeManager(request);
Assert.assertTrue(waitForNodeManagerToConnect(10000, nodeId));
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java Mon Jun 16 23:56:12 2014
@@ -20,15 +20,17 @@ package org.apache.hadoop.yarn.server.ap
import java.util.List;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
public abstract class RegisterNodeManagerRequest {
-
+
public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
int httpPort, Resource resource, String nodeManagerVersionId,
- List<NMContainerStatus> containerStatuses) {
+ List<NMContainerStatus> containerStatuses,
+ List<ApplicationId> runningApplications) {
RegisterNodeManagerRequest request =
Records.newRecord(RegisterNodeManagerRequest.class);
request.setHttpPort(httpPort);
@@ -36,6 +38,7 @@ public abstract class RegisterNodeManage
request.setNodeId(nodeId);
request.setNMVersion(nodeManagerVersionId);
request.setContainerStatuses(containerStatuses);
+ request.setRunningApplications(runningApplications);
return request;
}
@@ -45,10 +48,30 @@ public abstract class RegisterNodeManage
public abstract String getNMVersion();
public abstract List<NMContainerStatus> getNMContainerStatuses();
+ /**
+ * We introduce this here because currently YARN RM doesn't persist nodes info
+ * for application running. When RM restart happened, we cannot determinate if
+ * a node should do application cleanup (like log-aggregation, status update,
+ * etc.) or not. <p/>
+ * When we have this running application list in node manager register
+ * request, we can recover nodes info for running applications. And then we
+ * can take actions accordingly
+ *
+ * @return running application list in this node
+ */
+ public abstract List<ApplicationId> getRunningApplications();
+
public abstract void setNodeId(NodeId nodeId);
public abstract void setHttpPort(int port);
public abstract void setResource(Resource resource);
public abstract void setNMVersion(String version);
public abstract void setContainerStatuses(
List<NMContainerStatus> containerStatuses);
+
+ /**
+ * Setter for {@link RegisterNodeManagerRequest#getRunningApplications()}
+ * @param runningApplications running application in this node
+ */
+ public abstract void setRunningApplications(
+ List<ApplicationId> runningApplications);
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java Mon Jun 16 23:56:12 2014
@@ -20,12 +20,23 @@ package org.apache.hadoop.yarn.server.ap
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
@@ -44,6 +55,7 @@ public class RegisterNodeManagerRequestP
private Resource resource = null;
private NodeId nodeId = null;
private List<NMContainerStatus> containerStatuses = null;
+ private List<ApplicationId> runningApplications = null;
public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder();
@@ -65,6 +77,9 @@ public class RegisterNodeManagerRequestP
if (this.containerStatuses != null) {
addNMContainerStatusesToProto();
}
+ if (this.runningApplications != null) {
+ addRunningApplicationsToProto();
+ }
if (this.resource != null) {
builder.setResource(convertToProtoFormat(this.resource));
}
@@ -158,6 +173,66 @@ public class RegisterNodeManagerRequestP
maybeInitBuilder();
builder.setHttpPort(httpPort);
}
+
+ @Override
+ public List<ApplicationId> getRunningApplications() {
+ initRunningApplications();
+ return runningApplications;
+ }
+
+ private void initRunningApplications() {
+ if (this.runningApplications != null) {
+ return;
+ }
+ RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List<ApplicationIdProto> list = p.getRunningApplicationsList();
+ this.runningApplications = new ArrayList<ApplicationId>();
+ for (ApplicationIdProto c : list) {
+ this.runningApplications.add(convertFromProtoFormat(c));
+ }
+ }
+
+ @Override
+ public void setRunningApplications(List<ApplicationId> apps) {
+ if (apps == null) {
+ return;
+ }
+ initRunningApplications();
+ this.runningApplications.addAll(apps);
+ }
+
+ private void addRunningApplicationsToProto() {
+ maybeInitBuilder();
+ builder.clearRunningApplications();
+ if (runningApplications == null) {
+ return;
+ }
+ Iterable<ApplicationIdProto> it = new Iterable<ApplicationIdProto>() {
+
+ @Override
+ public Iterator<ApplicationIdProto> iterator() {
+ return new Iterator<ApplicationIdProto>() {
+ Iterator<ApplicationId> iter = runningApplications.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ApplicationIdProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllRunningApplications(it);
+ }
@Override
public List<NMContainerStatus> getNMContainerStatuses() {
@@ -216,6 +291,14 @@ public class RegisterNodeManagerRequestP
maybeInitBuilder();
builder.setNmVersion(version);
}
+
+ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
+ return new ApplicationIdPBImpl(p);
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId t) {
+ return ((ApplicationIdPBImpl)t).getProto();
+ }
private NodeIdPBImpl convertFromProtoFormat(NodeIdProto p) {
return new NodeIdPBImpl(p);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto Mon Jun 16 23:56:12 2014
@@ -31,6 +31,7 @@ message RegisterNodeManagerRequestProto
optional ResourceProto resource = 4;
optional string nm_version = 5;
repeated NMContainerStatusProto container_statuses = 6;
+ repeated ApplicationIdProto runningApplications = 7;
}
message RegisterNodeManagerResponseProto {
@@ -66,4 +67,4 @@ message NMContainerStatusProto {
optional PriorityProto priority = 4;
optional string diagnostics = 5 [default = "N/A"];
optional int32 container_exit_status = 6;
-}
\ No newline at end of file
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java Mon Jun 16 23:56:12 2014
@@ -83,7 +83,8 @@ public class TestProtocolRecords {
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(
NodeId.newInstance("1.1.1.1", 1000), 8080,
- Resource.newInstance(1024, 1), "NM-version-id", reports);
+ Resource.newInstance(1024, 1), "NM-version-id", reports,
+ Arrays.asList(appId));
RegisterNodeManagerRequest requestProto =
new RegisterNodeManagerRequestPBImpl(
((RegisterNodeManagerRequestPBImpl) request).getProto());
@@ -95,5 +96,7 @@ public class TestProtocolRecords {
requestProto.getNodeId());
Assert.assertEquals(Resource.newInstance(1024, 1),
requestProto.getResource());
+ Assert.assertEquals(1, requestProto.getRunningApplications().size());
+ Assert.assertEquals(appId, requestProto.getRunningApplications().get(0));
}
}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java?rev=1603028&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestRegisterNodeManagerRequest.java Mon Jun 16 23:56:12 2014
@@ -0,0 +1,81 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.api.protocolrecords;
+
+import java.util.Arrays;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRegisterNodeManagerRequest {
+ @Test
+ public void testRegisterNodeManagerRequest() {
+ RegisterNodeManagerRequest request =
+ RegisterNodeManagerRequest.newInstance(
+ NodeId.newInstance("host", 1234), 1234, Resource.newInstance(0, 0),
+ "version", Arrays.asList(NMContainerStatus.newInstance(
+ ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(
+ ApplicationId.newInstance(1234L, 1), 1), 1),
+ ContainerState.RUNNING, Resource.newInstance(1024, 1), "good",
+ -1)), Arrays.asList(ApplicationId.newInstance(1234L, 1),
+ ApplicationId.newInstance(1234L, 2)));
+
+ // serialze to proto, and get request from proto
+ RegisterNodeManagerRequest request1 =
+ new RegisterNodeManagerRequestPBImpl(
+ ((RegisterNodeManagerRequestPBImpl) request).getProto());
+
+ // check values
+ Assert.assertEquals(request1.getNMContainerStatuses().size(), request
+ .getNMContainerStatuses().size());
+ Assert.assertEquals(request1.getNMContainerStatuses().get(0).getContainerId(),
+ request.getNMContainerStatuses().get(0).getContainerId());
+ Assert.assertEquals(request1.getRunningApplications().size(), request
+ .getRunningApplications().size());
+ Assert.assertEquals(request1.getRunningApplications().get(0), request
+ .getRunningApplications().get(0));
+ Assert.assertEquals(request1.getRunningApplications().get(1), request
+ .getRunningApplications().get(1));
+ }
+
+ @Test
+ public void testRegisterNodeManagerRequestWithNullArrays() {
+ RegisterNodeManagerRequest request =
+ RegisterNodeManagerRequest.newInstance(NodeId.newInstance("host", 1234),
+ 1234, Resource.newInstance(0, 0), "version", null, null);
+
+ // serialze to proto, and get request from proto
+ RegisterNodeManagerRequest request1 =
+ new RegisterNodeManagerRequestPBImpl(
+ ((RegisterNodeManagerRequestPBImpl) request).getProto());
+
+ // check values
+ Assert.assertEquals(0, request1.getNMContainerStatuses().size());
+ Assert.assertEquals(0, request1.getRunningApplications().size());
+ }
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Mon Jun 16 23:56:12 2014
@@ -250,7 +250,7 @@ public class NodeStatusUpdaterImpl exten
List<NMContainerStatus> containerReports = getNMContainerStatuses();
RegisterNodeManagerRequest request =
RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
- nodeManagerVersionId, containerReports);
+ nodeManagerVersionId, containerReports, getRunningApplications());
if (containerReports != null) {
LOG.info("Registering with RM using containers :" + containerReports);
}
@@ -374,6 +374,12 @@ public class NodeStatusUpdaterImpl exten
}
return containerStatuses;
}
+
+ private List<ApplicationId> getRunningApplications() {
+ List<ApplicationId> runningApplications = new ArrayList<ApplicationId>();
+ runningApplications.addAll(this.context.getApplications().keySet());
+ return runningApplications;
+ }
// These NMContainerStatus are sent on NM registration and used by YARN only.
private List<NMContainerStatus> getNMContainerStatuses() {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Mon Jun 16 23:56:12 2014
@@ -244,15 +244,6 @@ public class ResourceTrackerService exte
Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion();
- if (!rmContext.isWorkPreservingRecoveryEnabled()) {
- if (!request.getNMContainerStatuses().isEmpty()) {
- LOG.info("received container statuses on node manager register :"
- + request.getNMContainerStatuses());
- for (NMContainerStatus status : request.getNMContainerStatuses()) {
- handleNMContainerStatus(status);
- }
- }
- }
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
@@ -311,7 +302,8 @@ public class ResourceTrackerService exte
RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
if (oldNode == null) {
this.rmContext.getDispatcher().getEventHandler().handle(
- new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses()));
+ new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(),
+ request.getRunningApplications()));
} else {
LOG.info("Reconnect from the node at: " + host);
this.nmLivelinessMonitor.unregister(nodeId);
@@ -322,6 +314,18 @@ public class ResourceTrackerService exte
// present for any running application.
this.nmTokenSecretManager.removeNodeKey(nodeId);
this.nmLivelinessMonitor.register(nodeId);
+
+ // Handle received container status, this should be processed after new
+ // RMNode inserted
+ if (!rmContext.isWorkPreservingRecoveryEnabled()) {
+ if (!request.getNMContainerStatuses().isEmpty()) {
+ LOG.info("received container statuses on node manager register :"
+ + request.getNMContainerStatuses());
+ for (NMContainerStatus status : request.getNMContainerStatuses()) {
+ handleNMContainerStatus(status);
+ }
+ }
+ }
String message =
"NodeManager from node " + host + "(cmPort: " + cmPort + " httpPort: "
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java Mon Jun 16 23:56:12 2014
@@ -19,16 +19,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
import java.util.Collection;
-
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -208,6 +208,14 @@ public interface RMApp extends EventHand
* @return the flag indicating whether the applications's state is stored.
*/
boolean isAppFinalStateStored();
+
+
+ /**
+ * Nodes on which the containers for this {@link RMApp} ran.
+ * @return the set of nodes that ran any containers from this {@link RMApp}
+ * Add more node on which containers for this {@link RMApp} ran
+ */
+ Set<NodeId> getRanNodes();
/**
* Create the external user-facing state of ApplicationMaster from the
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppEventType.java Mon Jun 16 23:56:12 2014
@@ -38,6 +38,9 @@ public enum RMAppEventType {
ATTEMPT_FAILED,
ATTEMPT_KILLED,
NODE_UPDATE,
+
+ // Source: Container and ResourceTracker
+ APP_RUNNING_ON_NODE,
// Source: RMStateStore
APP_NEW_SAVED,
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Mon Jun 16 23:56:12 2014
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -71,7 +72,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedSchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -116,6 +116,7 @@ public class RMAppImpl implements RMApp,
private EventHandler handler;
private static final AppFinishedTransition FINISHED_TRANSITION =
new AppFinishedTransition();
+ private Set<NodeId> ranNodes = new ConcurrentSkipListSet<NodeId>();
// These states stored are only valid when app is at killing or final_saving.
private RMAppState stateBeforeKilling;
@@ -180,7 +181,6 @@ public class RMAppImpl implements RMApp,
new FinalSavingTransition(
new AppKilledTransition(), RMAppState.KILLED))
-
// Transitions from ACCEPTED state
.addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
@@ -200,6 +200,9 @@ public class RMAppImpl implements RMApp,
new FinalSavingTransition(FINISHED_TRANSITION, RMAppState.FINISHED))
.addTransition(RMAppState.ACCEPTED, RMAppState.KILLING,
RMAppEventType.KILL, new KillAttemptTransition())
+ .addTransition(RMAppState.ACCEPTED, RMAppState.ACCEPTED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
// ACCECPTED state can once again receive APP_ACCEPTED event, because on
// recovery the app returns ACCEPTED state and the app once again go
// through the scheduler and triggers one more APP_ACCEPTED event at
@@ -220,6 +223,9 @@ public class RMAppImpl implements RMApp,
.addTransition(RMAppState.RUNNING, RMAppState.FINISHED,
// UnManagedAM directly jumps to finished
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+ .addTransition(RMAppState.RUNNING, RMAppState.RUNNING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
.addTransition(RMAppState.RUNNING,
EnumSet.of(RMAppState.ACCEPTED, RMAppState.FINAL_SAVING),
RMAppEventType.ATTEMPT_FAILED,
@@ -235,6 +241,9 @@ public class RMAppImpl implements RMApp,
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_FINISHED,
new AttemptFinishedAtFinalSavingTransition())
+ .addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
// ignorable transitions
.addTransition(RMAppState.FINAL_SAVING, RMAppState.FINAL_SAVING,
EnumSet.of(RMAppEventType.NODE_UPDATE, RMAppEventType.KILL,
@@ -243,6 +252,9 @@ public class RMAppImpl implements RMApp,
// Transitions from FINISHING state
.addTransition(RMAppState.FINISHING, RMAppState.FINISHED,
RMAppEventType.ATTEMPT_FINISHED, FINISHED_TRANSITION)
+ .addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
// ignorable transitions
.addTransition(RMAppState.FINISHING, RMAppState.FINISHING,
EnumSet.of(RMAppEventType.NODE_UPDATE,
@@ -251,6 +263,9 @@ public class RMAppImpl implements RMApp,
RMAppEventType.KILL))
// Transitions from KILLING state
+ .addTransition(RMAppState.KILLING, RMAppState.KILLING,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
.addTransition(RMAppState.KILLING, RMAppState.FINAL_SAVING,
RMAppEventType.ATTEMPT_KILLED,
new FinalSavingTransition(
@@ -267,6 +282,9 @@ public class RMAppImpl implements RMApp,
// Transitions from FINISHED state
// ignorable transitions
+ .addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
.addTransition(RMAppState.FINISHED, RMAppState.FINISHED,
EnumSet.of(
RMAppEventType.NODE_UPDATE,
@@ -276,11 +294,17 @@ public class RMAppImpl implements RMApp,
// Transitions from FAILED state
// ignorable transitions
+ .addTransition(RMAppState.FAILED, RMAppState.FAILED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
.addTransition(RMAppState.FAILED, RMAppState.FAILED,
EnumSet.of(RMAppEventType.KILL, RMAppEventType.NODE_UPDATE))
// Transitions from KILLED state
// ignorable transitions
+ .addTransition(RMAppState.KILLED, RMAppState.KILLED,
+ RMAppEventType.APP_RUNNING_ON_NODE,
+ new AppRunningOnNodeTransition())
.addTransition(
RMAppState.KILLED,
RMAppState.KILLED,
@@ -695,6 +719,23 @@ public class RMAppImpl implements RMApp,
nodeUpdateEvent.getNode());
};
}
+
+ private static final class AppRunningOnNodeTransition extends RMAppTransition {
+ public void transition(RMAppImpl app, RMAppEvent event) {
+ RMAppRunningOnNodeEvent nodeAddedEvent = (RMAppRunningOnNodeEvent) event;
+
+ // if final state already stored, notify RMNode
+ if (isAppInFinalState(app)) {
+ app.handler.handle(
+ new RMNodeCleanAppEvent(nodeAddedEvent.getNodeId(), nodeAddedEvent
+ .getApplicationId()));
+ return;
+ }
+
+ // otherwise, add it to ranNodes for further process
+ app.ranNodes.add(nodeAddedEvent.getNodeId());
+ };
+ }
/**
* Move an app to a new queue.
@@ -1037,17 +1078,8 @@ public class RMAppImpl implements RMApp,
this.finalState = finalState;
}
- private Set<NodeId> getNodesOnWhichAttemptRan(RMAppImpl app) {
- Set<NodeId> nodes = new HashSet<NodeId>();
- for (RMAppAttempt attempt : app.attempts.values()) {
- nodes.addAll(attempt.getRanNodes());
- }
- return nodes;
- }
-
public void transition(RMAppImpl app, RMAppEvent event) {
- Set<NodeId> nodes = getNodesOnWhichAttemptRan(app);
- for (NodeId nodeId : nodes) {
+ for (NodeId nodeId : app.getRanNodes()) {
app.handler.handle(
new RMNodeCleanAppEvent(nodeId, app.applicationId));
}
@@ -1148,4 +1180,9 @@ public class RMAppImpl implements RMApp,
private RMAppState getRecoveredFinalState() {
return this.recoveredFinalState;
}
+
+ @Override
+ public Set<NodeId> getRanNodes() {
+ return ranNodes;
+ }
}
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRunningOnNodeEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRunningOnNodeEvent.java?rev=1603028&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRunningOnNodeEvent.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppRunningOnNodeEvent.java Mon Jun 16 23:56:12 2014
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.rmapp;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+
+public class RMAppRunningOnNodeEvent extends RMAppEvent {
+ private final NodeId node;
+
+ public RMAppRunningOnNodeEvent(ApplicationId appId, NodeId node) {
+ super(appId, RMAppEventType.APP_RUNNING_ON_NODE);
+ this.node = node;
+ }
+
+ public NodeId getNodeId() {
+ return node;
+ }
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttempt.java Mon Jun 16 23:56:12 2014
@@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt;
import java.util.List;
-import java.util.Set;
import javax.crypto.SecretKey;
@@ -32,7 +31,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -115,12 +113,6 @@ public interface RMAppAttempt extends Ev
FinalApplicationStatus getFinalApplicationStatus();
/**
- * Nodes on which the containers for this {@link RMAppAttempt} ran.
- * @return the set of nodes that ran any containers from this {@link RMAppAttempt}
- */
- Set<NodeId> getRanNodes();
-
- /**
* Return a list of the last set of finished containers, resetting the
* finished containers to empty.
* @return the list of just finished containers, re setting the finished containers.
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptEventType.java Mon Jun 16 23:56:12 2014
@@ -36,7 +36,6 @@ public enum RMAppAttemptEventType {
UNREGISTERED,
// Source: Containers
- CONTAINER_ACQUIRED,
CONTAINER_ALLOCATED,
CONTAINER_FINISHED,
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Mon Jun 16 23:56:12 2014
@@ -26,16 +26,13 @@ import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.EnumSet;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import javax.crypto.SecretKey;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -54,7 +51,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -80,7 +76,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
@@ -103,6 +98,8 @@ import org.apache.hadoop.yarn.state.Stat
import org.apache.hadoop.yarn.state.StateMachineFactory;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
+import com.google.common.annotations.VisibleForTesting;
+
@SuppressWarnings({"unchecked", "rawtypes"})
public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@@ -133,10 +130,7 @@ public class RMAppAttemptImpl implements
private final ApplicationSubmissionContext submissionContext;
private Token<AMRMTokenIdentifier> amrmToken = null;
private SecretKey clientTokenMasterKey = null;
-
- //nodes on while this attempt's containers ran
- private Set<NodeId> ranNodes =
- new HashSet<NodeId>();
+
private List<ContainerStatus> justFinishedContainers =
new ArrayList<ContainerStatus>();
private Container masterContainer;
@@ -219,10 +213,7 @@ public class RMAppAttemptImpl implements
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.ALLOCATED,
RMAppAttemptEventType.ATTEMPT_NEW_SAVED, new AttemptStoredTransition())
- .addTransition(RMAppAttemptState.ALLOCATED_SAVING,
- RMAppAttemptState.ALLOCATED_SAVING,
- RMAppAttemptEventType.CONTAINER_ACQUIRED,
- new ContainerAcquiredTransition())
+
// App could be killed by the client. So need to handle this.
.addTransition(RMAppAttemptState.ALLOCATED_SAVING,
RMAppAttemptState.FINAL_SAVING,
@@ -249,10 +240,6 @@ public class RMAppAttemptImpl implements
RMAppAttemptState.KILLED), RMAppAttemptState.KILLED))
// Transitions from ALLOCATED State
- .addTransition(RMAppAttemptState.ALLOCATED,
- RMAppAttemptState.ALLOCATED,
- RMAppAttemptEventType.CONTAINER_ACQUIRED,
- new ContainerAcquiredTransition())
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.LAUNCHED,
RMAppAttemptEventType.LAUNCHED, new AMLaunchedTransition())
.addTransition(RMAppAttemptState.ALLOCATED, RMAppAttemptState.FINAL_SAVING,
@@ -297,10 +284,6 @@ public class RMAppAttemptImpl implements
.addTransition(RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
RMAppAttemptEventType.CONTAINER_ALLOCATED)
.addTransition(
- RMAppAttemptState.RUNNING, RMAppAttemptState.RUNNING,
- RMAppAttemptEventType.CONTAINER_ACQUIRED,
- new ContainerAcquiredTransition())
- .addTransition(
RMAppAttemptState.RUNNING,
EnumSet.of(RMAppAttemptState.RUNNING, RMAppAttemptState.FINAL_SAVING),
RMAppAttemptEventType.CONTAINER_FINISHED,
@@ -337,7 +320,6 @@ public class RMAppAttemptImpl implements
// should be fixed to reject container allocate request at Final
// Saving in scheduler
RMAppAttemptEventType.CONTAINER_ALLOCATED,
- RMAppAttemptEventType.CONTAINER_ACQUIRED,
RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
RMAppAttemptEventType.KILL))
@@ -620,11 +602,6 @@ public class RMAppAttemptImpl implements
}
@Override
- public Set<NodeId> getRanNodes() {
- return ranNodes;
- }
-
- @Override
public Container getMasterContainer() {
this.readLock.lock();
@@ -705,7 +682,6 @@ public class RMAppAttemptImpl implements
public void transferStateFromPreviousAttempt(RMAppAttempt attempt) {
this.justFinishedContainers = attempt.getJustFinishedContainers();
- this.ranNodes = attempt.getRanNodes();
}
private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
@@ -1402,17 +1378,6 @@ public class RMAppAttemptImpl implements
finalStatus = unregisterEvent.getFinalApplicationStatus();
}
- private static final class ContainerAcquiredTransition extends
- BaseTransition {
- @Override
- public void transition(RMAppAttemptImpl appAttempt,
- RMAppAttemptEvent event) {
- RMAppAttemptContainerAcquiredEvent acquiredEvent
- = (RMAppAttemptContainerAcquiredEvent) event;
- appAttempt.ranNodes.add(acquiredEvent.getContainer().getNodeId());
- }
- }
-
private static final class ContainerFinishedTransition
implements
MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java Mon Jun 16 23:56:12 2014
@@ -37,7 +37,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAllocatedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
@@ -365,9 +365,9 @@ public class RMContainerImpl implements
RMContainerEventType.FINISHED));
return RMContainerState.COMPLETED;
} else if (report.getContainerState().equals(ContainerState.RUNNING)) {
- // Tell the appAttempt
- container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
- container.getApplicationAttemptId(), container.getContainer()));
+ // Tell the app
+ container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
+ .getApplicationAttemptId().getApplicationId(), container.nodeId));
return RMContainerState.RUNNING;
} else {
// This can never happen.
@@ -408,9 +408,9 @@ public class RMContainerImpl implements
// Register with containerAllocationExpirer.
container.containerAllocationExpirer.register(container.getContainerId());
- // Tell the appAttempt
- container.eventHandler.handle(new RMAppAttemptContainerAcquiredEvent(
- container.getApplicationAttemptId(), container.getContainer()));
+ // Tell the app
+ container.eventHandler.handle(new RMAppRunningOnNodeEvent(container
+ .getApplicationAttemptId().getApplicationId(), container.nodeId));
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Mon Jun 16 23:56:12 2014
@@ -55,6 +55,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.NodesListManagerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRunningOnNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -473,7 +475,13 @@ public class RMNodeImpl implements RMNod
} else {
// Increment activeNodes explicitly because this is a new node.
ClusterMetrics.getMetrics().incrNumActiveNodes();
- containers = startEvent.getContainerRecoveryReports();
+ containers = startEvent.getNMContainerStatuses();
+ }
+
+ if (null != startEvent.getRunningApplications()) {
+ for (ApplicationId appId : startEvent.getRunningApplications()) {
+ handleRunningAppOnNode(rmNode, rmNode.context, appId, rmNode.nodeId);
+ }
}
rmNode.context.getDispatcher().getEventHandler()
@@ -482,6 +490,24 @@ public class RMNodeImpl implements RMNod
new NodesListManagerEvent(
NodesListManagerEventType.NODE_USABLE, rmNode));
}
+
+ void handleRunningAppOnNode(RMNodeImpl rmNode, RMContext context,
+ ApplicationId appId, NodeId nodeId) {
+ RMApp app = context.getRMApps().get(appId);
+
+ // if we failed getting app by appId, maybe something wrong happened, just
+ // add the app to the finishedApplications list so that the app can be
+ // cleaned up on the NM
+ if (null == app) {
+ LOG.warn("Cannot get RMApp by appId=" + appId
+ + ", just added it to finishedApplications list for cleanup");
+ rmNode.finishedApplications.add(appId);
+ return;
+ }
+
+ context.getDispatcher().getEventHandler()
+ .handle(new RMAppRunningOnNodeEvent(appId, nodeId));
+ }
}
public static class ReconnectNodeTransition implements
@@ -517,7 +543,7 @@ public class RMNodeImpl implements RMNod
}
rmNode.context.getRMNodes().put(newNode.getNodeID(), newNode);
rmNode.context.getDispatcher().getEventHandler().handle(
- new RMNodeStartedEvent(newNode.getNodeID(), null));
+ new RMNodeStartedEvent(newNode.getNodeID(), null, null));
}
rmNode.context.getDispatcher().getEventHandler().handle(
new NodesListManagerEvent(
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java Mon Jun 16 23:56:12 2014
@@ -20,19 +20,28 @@ package org.apache.hadoop.yarn.server.re
import java.util.List;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
public class RMNodeStartedEvent extends RMNodeEvent {
- private List<NMContainerStatus> containerReports;
+ private List<NMContainerStatus> containerStatuses;
+ private List<ApplicationId> runningApplications;
- public RMNodeStartedEvent(NodeId nodeId, List<NMContainerStatus> containerReports) {
+ public RMNodeStartedEvent(NodeId nodeId,
+ List<NMContainerStatus> containerReports,
+ List<ApplicationId> runningApplications) {
super(nodeId, RMNodeEventType.STARTED);
- this.containerReports = containerReports;
+ this.containerStatuses = containerReports;
+ this.runningApplications = runningApplications;
}
- public List<NMContainerStatus> getContainerRecoveryReports() {
- return this.containerReports;
+ public List<NMContainerStatus> getNMContainerStatuses() {
+ return this.containerStatuses;
+ }
+
+ public List<ApplicationId> getRunningApplications() {
+ return runningApplications;
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Mon Jun 16 23:56:12 2014
@@ -100,11 +100,17 @@ public class MockNM {
}
public RegisterNodeManagerResponse registerNode() throws Exception {
- return registerNode(null);
+ return registerNode(null, null);
+ }
+
+ public RegisterNodeManagerResponse registerNode(
+ List<ApplicationId> runningApplications) throws Exception {
+ return registerNode(null, runningApplications);
}
public RegisterNodeManagerResponse registerNode(
- List<NMContainerStatus> containerReports) throws Exception{
+ List<NMContainerStatus> containerReports,
+ List<ApplicationId> runningApplications) throws Exception {
RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class);
req.setNodeId(nodeId);
@@ -113,6 +119,7 @@ public class MockNM {
req.setResource(resource);
req.setContainerStatuses(containerReports);
req.setNMVersion(version);
+ req.setRunningApplications(runningApplications);
RegisterNodeManagerResponse registrationResponse =
resourceTracker.registerNodeManager(req);
this.currentContainerTokenMasterKey =
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java Mon Jun 16 23:56:12 2014
@@ -78,6 +78,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.Records;
+import org.apache.hadoop.yarn.util.YarnVersionInfo;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
@@ -350,11 +351,20 @@ public class MockRM extends ResourceMana
nm.registerNode();
return nm;
}
+
+ public MockNM registerNode(String nodeIdStr, int memory, int vCores,
+ List<ApplicationId> runningApplications) throws Exception {
+ MockNM nm =
+ new MockNM(nodeIdStr, memory, vCores, getResourceTrackerService(),
+ YarnVersionInfo.getVersion());
+ nm.registerNode(runningApplications);
+ return nm;
+ }
public void sendNodeStarted(MockNM nm) throws Exception {
RMNodeImpl node = (RMNodeImpl) getRMContext().getRMNodes().get(
nm.getNodeId());
- node.handle(new RMNodeStartedEvent(nm.getNodeId(), null));
+ node.handle(new RMNodeStartedEvent(nm.getNodeId(), null, null));
}
public void sendNodeLost(MockNM nm) throws Exception {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestApplicationCleanup.java Mon Jun 16 23:56:12 2014
@@ -18,26 +18,30 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.junit.Assert;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
@@ -45,13 +49,29 @@ import org.apache.hadoop.yarn.server.uti
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
+import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
public class TestApplicationCleanup {
private static final Log LOG = LogFactory
.getLog(TestApplicationCleanup.class);
+
+ private YarnConfiguration conf;
+
+ @Before
+ public void setup() throws UnknownHostException {
+ Logger rootLogger = LogManager.getRootLogger();
+ rootLogger.setLevel(Level.DEBUG);
+ conf = new YarnConfiguration();
+ UserGroupInformation.setConfiguration(conf);
+ conf.set(YarnConfiguration.RECOVERY_ENABLED, "true");
+ conf.set(YarnConfiguration.RM_STORE, MemoryRMStateStore.class.getName());
+ Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
+ }
+ @SuppressWarnings("resource")
@Test
public void testAppCleanup() throws Exception {
Logger rootLogger = LogManager.getRootLogger();
@@ -130,6 +150,7 @@ public class TestApplicationCleanup {
rm.stop();
}
+ @SuppressWarnings("resource")
@Test
public void testContainerCleanup() throws Exception {
@@ -252,6 +273,69 @@ public class TestApplicationCleanup {
rm.stop();
}
+
+ private void waitForAppCleanupMessageRecved(MockNM nm, ApplicationId appId)
+ throws Exception {
+ while (true) {
+ NodeHeartbeatResponse response = nm.nodeHeartbeat(true);
+ if (response.getApplicationsToCleanup() != null
+ && response.getApplicationsToCleanup().size() == 1
+ && appId.equals(response.getApplicationsToCleanup().get(0))) {
+ return;
+ }
+
+ LOG.info("Haven't got application=" + appId.toString()
+ + " in cleanup list from node heartbeat response, "
+ + "sleep for a while before next heartbeat");
+ Thread.sleep(1000);
+ }
+ }
+
+ private MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
+ throws Exception {
+ RMAppAttempt attempt = app.getCurrentAppAttempt();
+ nm.nodeHeartbeat(true);
+ MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+ am.registerAppAttempt();
+ rm.waitForState(app.getApplicationId(), RMAppState.RUNNING);
+ return am;
+ }
+
+ @SuppressWarnings("resource")
+ @Test (timeout = 60000)
+ public void testAppCleanupWhenRestartedAfterAppFinished() throws Exception {
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+
+ // start RM
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 15120, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // create app and launch the AM
+ RMApp app0 = rm1.submitApp(200);
+ MockAM am0 = launchAM(app0, rm1, nm1);
+ nm1.nodeHeartbeat(am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ rm1.waitForState(app0.getApplicationId(), RMAppState.FAILED);
+
+ // start new RM
+ MockRM rm2 = new MockRM(conf, memStore);
+ rm2.start();
+
+ // nm1 register to rm2, and do a heartbeat
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ nm1.registerNode(Arrays.asList(app0.getApplicationId()));
+ rm2.waitForState(app0.getApplicationId(), RMAppState.FAILED);
+
+ // wait for application cleanup message received
+ waitForAppCleanupMessageRecved(nm1, app0.getApplicationId());
+
+ rm1.stop();
+ rm2.stop();
+ }
public static void main(String[] args) throws Exception {
TestApplicationCleanup t = new TestApplicationCleanup();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMNodeTransitions.java Mon Jun 16 23:56:12 2014
@@ -161,7 +161,7 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000)
public void testExpiredContainer() {
// Start the node
- node.handle(new RMNodeStartedEvent(null, null));
+ node.handle(new RMNodeStartedEvent(null, null, null));
verify(scheduler).handle(any(NodeAddedSchedulerEvent.class));
// Expire a container
@@ -189,11 +189,11 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000)
public void testContainerUpdate() throws InterruptedException{
//Start the node
- node.handle(new RMNodeStartedEvent(null, null));
+ node.handle(new RMNodeStartedEvent(null, null, null));
NodeId nodeId = BuilderUtils.newNodeId("localhost:1", 1);
RMNodeImpl node2 = new RMNodeImpl(nodeId, rmContext, null, 0, 0, null, null, null);
- node2.handle(new RMNodeStartedEvent(null, null));
+ node2.handle(new RMNodeStartedEvent(null, null, null));
ContainerId completedContainerIdFromNode1 = BuilderUtils.newContainerId(
BuilderUtils.newApplicationAttemptId(
@@ -249,7 +249,7 @@ public class TestRMNodeTransitions {
@Test (timeout = 5000)
public void testStatusChange(){
//Start the node
- node.handle(new RMNodeStartedEvent(null, null));
+ node.handle(new RMNodeStartedEvent(null, null, null));
//Add info to the queue first
node.setNextHeartBeat(false);
@@ -465,7 +465,7 @@ public class TestRMNodeTransitions {
RMNodeImpl node = new RMNodeImpl(nodeId, rmContext,null, 0, 0,
null, ResourceOption.newInstance(capability,
RMNode.OVER_COMMIT_TIMEOUT_MILLIS_DEFAULT), nmVersion);
- node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
+ node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
Assert.assertEquals(NodeState.RUNNING, node.getState());
return node;
}
@@ -496,7 +496,7 @@ public class TestRMNodeTransitions {
int initialUnhealthy = cm.getUnhealthyNMs();
int initialDecommissioned = cm.getNumDecommisionedNMs();
int initialRebooted = cm.getNumRebootedNMs();
- node.handle(new RMNodeStartedEvent(node.getNodeID(), null));
+ node.handle(new RMNodeStartedEvent(node.getNodeID(), null, null));
Assert.assertEquals("Active Nodes", initialActive + 1, cm.getNumActiveNMs());
Assert.assertEquals("Lost Nodes", initialLost, cm.getNumLostNMs());
Assert.assertEquals("Unhealthy Nodes",
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Mon Jun 16 23:56:12 2014
@@ -102,7 +102,6 @@ import org.junit.Before;
import org.junit.Test;
public class TestRMRestart {
-
private final static File TEMP_DIR = new File(System.getProperty(
"test.build.data", "/tmp"), "decommision");
private File hostFile = new File(TEMP_DIR + File.separator + "hostFile.txt");
@@ -309,7 +308,7 @@ public class TestRMRestart {
TestRMRestart
.createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
.getAppAttemptId(), 1, ContainerState.COMPLETE);
- nm1.registerNode(Arrays.asList(status));
+ nm1.registerNode(Arrays.asList(status), null);
nm2.registerNode();
rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
@@ -392,7 +391,7 @@ public class TestRMRestart {
// completed apps are not removed immediately after app finish
// And finished app is also loaded back.
Assert.assertEquals(4, rmAppState.size());
- }
+ }
@Test (timeout = 60000)
public void testRMRestartAppRunningAMFailed() throws Exception {
@@ -514,7 +513,7 @@ public class TestRMRestart {
NMContainerStatus status =
TestRMRestart.createNMContainerStatus(
am2.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
- nm1.registerNode(Arrays.asList(status));
+ nm1.registerNode(Arrays.asList(status), null);
rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
launchAM(rmApp, rm2, nm1);
Assert.assertEquals(3, rmApp.getAppAttempts().size());
@@ -1680,7 +1679,8 @@ public class TestRMRestart {
TestRMRestart
.createNMContainerStatus(loadedApp1.getCurrentAppAttempt()
.getAppAttemptId(), 1, ContainerState.COMPLETE);
- nm1.registerNode(Arrays.asList(status));
+ nm1.registerNode(Arrays.asList(status), null);
+
while (loadedApp1.getAppAttempts().size() != 2) {
Thread.sleep(200);
}
@@ -1807,7 +1807,7 @@ public class TestRMRestart {
NMContainerStatus status =
TestRMRestart.createNMContainerStatus(
am0.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
- nm1.registerNode(Arrays.asList(status));
+ nm1.registerNode(Arrays.asList(status), null);
}
};
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java Mon Jun 16 23:56:12 2014
@@ -159,7 +159,7 @@ public class TestWorkPreservingRMRestart
ContainerState.COMPLETE);
nm1.registerNode(Arrays.asList(amContainer, runningContainer,
- completedContainer));
+ completedContainer), null);
// Wait for RM to settle down on recovering containers;
waitForNumContainersToRecover(2, rm2, am1.getApplicationAttemptId());
@@ -383,11 +383,11 @@ public class TestWorkPreservingRMRestart
List<NMContainerStatus> am1_2Containers =
createNMContainerStatusForApp(am1_2);
am1_1Containers.addAll(am1_2Containers);
- nm1.registerNode(am1_1Containers);
+ nm1.registerNode(am1_1Containers, null);
List<NMContainerStatus> am2Containers =
createNMContainerStatusForApp(am2);
- nm2.registerNode(am2Containers);
+ nm2.registerNode(am2Containers, null);
// Wait for RM to settle down on recovering containers;
waitForNumContainersToRecover(2, rm2, am1_1.getApplicationAttemptId());
@@ -482,7 +482,7 @@ public class TestWorkPreservingRMRestart
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
ContainerState.COMPLETE);
nm1.registerNode(Arrays.asList(amContainer, runningContainer,
- completedContainer));
+ completedContainer), null);
rm2.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED);
// Wait for RM to settle down on recovering containers;
Thread.sleep(3000);
@@ -519,7 +519,7 @@ public class TestWorkPreservingRMRestart
NMContainerStatus completedContainer =
TestRMRestart.createNMContainerStatus(am1.getApplicationAttemptId(), 3,
ContainerState.COMPLETE);
- nm1.registerNode(Arrays.asList(runningContainer, completedContainer));
+ nm1.registerNode(Arrays.asList(runningContainer, completedContainer), null);
RMApp recoveredApp1 =
rm2.getRMContext().getRMApps().get(app1.getApplicationId());
assertEquals(RMAppState.FINISHED, recoveredApp1.getState());
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java Mon Jun 16 23:56:12 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -159,6 +160,11 @@ public abstract class MockAsm extends Mo
public YarnApplicationState createApplicationState() {
throw new UnsupportedOperationException("Not supported yet.");
}
+
+ @Override
+ public Set<NodeId> getRanNodes() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
}
public static RMApp newApplication(int i) {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java?rev=1603028&r1=1603027&r2=1603028&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java Mon Jun 16 23:56:12 2014
@@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -232,4 +233,9 @@ public class MockRMApp implements RMApp
public YarnApplicationState createApplicationState() {
return null;
}
+
+ @Override
+ public Set<NodeId> getRanNodes() {
+ return null;
+ }
}