You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/11/19 06:17:21 UTC
svn commit: r1543310 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/sr...
Author: vinodkv
Date: Tue Nov 19 05:17:20 2013
New Revision: 1543310
URL: http://svn.apache.org/r1543310
Log:
YARN-1210. Changed RM to start new app-attempts on RM restart only after ensuring that previous AM exited or after expiry time. Contributed by Omkar Vinit Joshi.
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Nov 19 05:17:20 2013
@@ -103,6 +103,10 @@ Release 2.3.0 - UNRELEASED
YARN-709. Added tests to verify validity of delegation tokens and logging of
appsummary after RM restart. (Jian He via vinodkv)
+ YARN-1210. Changed RM to start new app-attempts on RM restart only after
+ ensuring that previous AM exited or after expiry time. (Omkar Vinit Joshi via
+ vinodkv)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java Tue Nov 19 05:17:20 2013
@@ -20,15 +20,29 @@ package org.apache.hadoop.yarn.server.ap
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.util.Records;
-public interface NodeHeartbeatRequest {
+public abstract class NodeHeartbeatRequest {
+
+ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
+ MasterKey lastKnownContainerTokenMasterKey,
+ MasterKey lastKnownNMTokenMasterKey) {
+ NodeHeartbeatRequest nodeHeartbeatRequest =
+ Records.newRecord(NodeHeartbeatRequest.class);
+ nodeHeartbeatRequest.setNodeStatus(nodeStatus);
+ nodeHeartbeatRequest
+ .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
+ nodeHeartbeatRequest
+ .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
+ return nodeHeartbeatRequest;
+ }
- NodeStatus getNodeStatus();
- void setNodeStatus(NodeStatus status);
+ public abstract NodeStatus getNodeStatus();
+ public abstract void setNodeStatus(NodeStatus status);
- MasterKey getLastKnownContainerTokenMasterKey();
- void setLastKnownContainerTokenMasterKey(MasterKey secretKey);
+ public abstract MasterKey getLastKnownContainerTokenMasterKey();
+ public abstract void setLastKnownContainerTokenMasterKey(MasterKey secretKey);
- MasterKey getLastKnownNMTokenMasterKey();
- void setLastKnownNMTokenMasterKey(MasterKey secretKey);
+ public abstract MasterKey getLastKnownNMTokenMasterKey();
+ public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey);
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java Tue Nov 19 05:17:20 2013
@@ -18,17 +18,37 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
-public interface RegisterNodeManagerRequest {
- NodeId getNodeId();
- int getHttpPort();
- Resource getResource();
- String getNMVersion();
+public abstract class RegisterNodeManagerRequest {
+
+ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
+ int httpPort, Resource resource, String nodeManagerVersionId,
+ List<ContainerStatus> containerStatuses) {
+ RegisterNodeManagerRequest request =
+ Records.newRecord(RegisterNodeManagerRequest.class);
+ request.setHttpPort(httpPort);
+ request.setResource(resource);
+ request.setNodeId(nodeId);
+ request.setNMVersion(nodeManagerVersionId);
+ request.setContainerStatuses(containerStatuses);
+ return request;
+ }
+
+ public abstract NodeId getNodeId();
+ public abstract int getHttpPort();
+ public abstract Resource getResource();
+ public abstract String getNMVersion();
+ public abstract List<ContainerStatus> getContainerStatuses();
- void setNodeId(NodeId nodeId);
- void setHttpPort(int port);
- void setResource(Resource resource);
- void setNMVersion(String version);
+ public abstract void setNodeId(NodeId nodeId);
+ public abstract void setHttpPort(int port);
+ public abstract void setResource(Resource resource);
+ public abstract void setNMVersion(String version);
+ public abstract void setContainerStatuses(List<ContainerStatus> containerStatuses);
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java Tue Nov 19 05:17:20 2013
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
@@ -29,8 +28,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
-public class NodeHeartbeatRequestPBImpl extends
- ProtoBase<NodeHeartbeatRequestProto> implements NodeHeartbeatRequest {
+public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
NodeHeartbeatRequestProto proto = NodeHeartbeatRequestProto.getDefaultInstance();
NodeHeartbeatRequestProto.Builder builder = null;
boolean viaProto = false;
@@ -55,6 +53,21 @@ public class NodeHeartbeatRequestPBImpl
return proto;
}
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
private void mergeLocalToBuilder() {
if (this.nodeStatus != null) {
builder.setNodeStatus(convertToProtoFormat(this.nodeStatus));
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java Tue Nov 19 05:17:20 2013
@@ -19,11 +19,21 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
@@ -32,13 +42,14 @@ import org.apache.hadoop.yarn.server.api
-public class RegisterNodeManagerRequestPBImpl extends ProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {
+public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
RegisterNodeManagerRequestProto.Builder builder = null;
boolean viaProto = false;
private Resource resource = null;
private NodeId nodeId = null;
+ private List<ContainerStatus> containerStatuses = null;
public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder();
@@ -57,6 +68,9 @@ public class RegisterNodeManagerRequestP
}
private void mergeLocalToBuilder() {
+ if (this.containerStatuses != null) {
+ addContainerStatusesToProto();
+ }
if (this.resource != null) {
builder.setResource(convertToProtoFormat(this.resource));
}
@@ -140,6 +154,81 @@ public class RegisterNodeManagerRequestP
}
@Override
+ public List<ContainerStatus> getContainerStatuses() {
+ initContainerStatuses();
+ return containerStatuses;
+ }
+
+ private void initContainerStatuses() {
+ if (this.containerStatuses != null) {
+ return;
+ }
+ RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List<ContainerStatusProto> list = p.getContainerStatusesList();
+ this.containerStatuses = new ArrayList<ContainerStatus>();
+ for (ContainerStatusProto c : list) {
+ this.containerStatuses.add(convertFromProtoFormat(c));
+ }
+ }
+
+ @Override
+ public void setContainerStatuses(List<ContainerStatus> containers) {
+ if (containers == null) {
+ return;
+ }
+ initContainerStatuses();
+ this.containerStatuses.addAll(containers);
+ }
+
+ private void addContainerStatusesToProto() {
+ maybeInitBuilder();
+ builder.clearContainerStatuses();
+ if (containerStatuses == null) {
+ return;
+ }
+ Iterable<ContainerStatusProto> it = new Iterable<ContainerStatusProto>() {
+
+ @Override
+ public Iterator<ContainerStatusProto> iterator() {
+ return new Iterator<ContainerStatusProto>() {
+ Iterator<ContainerStatus> iter = containerStatuses.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ContainerStatusProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllContainerStatuses(it);
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
public String getNMVersion() {
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNmVersion()) {
@@ -170,6 +259,11 @@ public class RegisterNodeManagerRequestP
return ((ResourcePBImpl)t).getProto();
}
-
-
-}
+ private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) {
+ return new ContainerStatusPBImpl(c);
+ }
+
+ private ContainerStatusProto convertToProtoFormat(ContainerStatus c) {
+ return ((ContainerStatusPBImpl)c).getProto();
+ }
+}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java Tue Nov 19 05:17:20 2013
@@ -22,10 +22,24 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.Records;
-public interface NodeStatus {
+public abstract class NodeStatus {
+ public static NodeStatus newInstance(NodeId nodeId, int responseId,
+ List<ContainerStatus> containerStatuses,
+ List<ApplicationId> keepAliveApplications,
+ NodeHealthStatus nodeHealthStatus) {
+ NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
+ nodeStatus.setResponseId(responseId);
+ nodeStatus.setNodeId(nodeId);
+ nodeStatus.setContainersStatuses(containerStatuses);
+ nodeStatus.setKeepAliveApplications(keepAliveApplications);
+ nodeStatus.setNodeHealthStatus(nodeHealthStatus);
+ return nodeStatus;
+ }
+
public abstract NodeId getNodeId();
public abstract int getResponseId();
@@ -36,8 +50,8 @@ public interface NodeStatus {
public abstract List<ApplicationId> getKeepAliveApplications();
public abstract void setKeepAliveApplications(List<ApplicationId> appIds);
- NodeHealthStatus getNodeHealthStatus();
- void setNodeHealthStatus(NodeHealthStatus healthStatus);
+ public abstract NodeHealthStatus getNodeHealthStatus();
+ public abstract void setNodeHealthStatus(NodeHealthStatus healthStatus);
public abstract void setNodeId(NodeId nodeId);
public abstract void setResponseId(int responseId);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java Tue Nov 19 05:17:20 2013
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@@ -40,8 +39,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
-public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements
- NodeStatus {
+public class NodeStatusPBImpl extends NodeStatus {
NodeStatusProto proto = NodeStatusProto.getDefaultInstance();
NodeStatusProto.Builder builder = null;
boolean viaProto = false;
@@ -167,6 +165,21 @@ public class NodeStatusPBImpl extends Pr
}
@Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
public synchronized int getResponseId() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
return p.getResponseId();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto Tue Nov 19 05:17:20 2013
@@ -30,6 +30,7 @@ message RegisterNodeManagerRequestProto
optional int32 http_port = 3;
optional ResourceProto resource = 4;
optional string nm_version = 5;
+ repeated ContainerStatusProto containerStatuses = 6;
}
message RegisterNodeManagerResponseProto {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Tue Nov 19 05:17:20 2013
@@ -26,7 +26,7 @@ public interface NodeStatusUpdater exten
void sendOutofBandHeartBeat();
- NodeStatus getNodeStatusAndUpdateContainersInContext();
+ NodeStatus getNodeStatusAndUpdateContainersInContext(int responseId);
long getRMIdentifier();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Tue Nov 19 05:17:20 2013
@@ -48,8 +48,6 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
@@ -89,7 +87,6 @@ public class NodeStatusUpdaterImpl exten
private String nodeManagerVersionId;
private String minimumResourceManagerVersion;
private volatile boolean isStopped;
- private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private boolean tokenKeepAliveEnabled;
private long tokenRemovalDelayMs;
/** Keeps track of when the next keep alive request should be sent for an app*/
@@ -134,9 +131,7 @@ public class NodeStatusUpdaterImpl exten
conf.getInt(
YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
- this.totalResource = recordFactory.newRecordInstance(Resource.class);
- this.totalResource.setMemory(memoryMb);
- this.totalResource.setVirtualCores(virtualCores);
+ this.totalResource = Resource.newInstance(memoryMb, virtualCores);
metrics.addResource(totalResource);
this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
this.tokenRemovalDelayMs =
@@ -238,13 +233,17 @@ public class NodeStatusUpdaterImpl exten
}
@VisibleForTesting
- protected void registerWithRM() throws YarnException, IOException {
+ protected void registerWithRM()
+ throws YarnException, IOException {
+ List<ContainerStatus> containerStatuses =
+ this.updateAndGetContainerStatuses();
RegisterNodeManagerRequest request =
- recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
- request.setHttpPort(this.httpPort);
- request.setResource(this.totalResource);
- request.setNodeId(this.nodeId);
- request.setNMVersion(this.nodeManagerVersionId);
+ RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
+ nodeManagerVersionId, containerStatuses);
+ if (containerStatuses != null) {
+ LOG.info("Registering with RM using finished containers :"
+ + containerStatuses);
+ }
RegisterNodeManagerResponse regNMResponse =
resourceTracker.registerNodeManager(request);
this.rmIdentifier = regNMResponse.getRMIdentifier();
@@ -323,13 +322,33 @@ public class NodeStatusUpdaterImpl exten
}
@Override
- public NodeStatus getNodeStatusAndUpdateContainersInContext() {
+ public NodeStatus getNodeStatusAndUpdateContainersInContext(
+ int responseId) {
- NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
- nodeStatus.setNodeId(this.nodeId);
+ NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
+ nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
+ nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
+ nodeHealthStatus.setLastHealthReportTime(
+ healthChecker.getLastHealthReportTime());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
+ + ", " + nodeHealthStatus.getHealthReport());
+ }
+ List<ContainerStatus> containersStatuses = updateAndGetContainerStatuses();
+ LOG.debug(this.nodeId + " sending out status for "
+ + containersStatuses.size() + " containers");
+ NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId,
+ containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus);
- int numActiveContainers = 0;
- List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
+ return nodeStatus;
+ }
+
+ /*
+ * It will return current container statuses. If any container has
+ * COMPLETED then it will be removed from context.
+ */
+ private List<ContainerStatus> updateAndGetContainerStatuses() {
+ List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
for (Iterator<Entry<ContainerId, Container>> i =
this.context.getContainers().entrySet().iterator(); i.hasNext();) {
Entry<ContainerId, Container> e = i.next();
@@ -339,8 +358,7 @@ public class NodeStatusUpdaterImpl exten
// Clone the container to send it to the RM
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
container.cloneAndGetContainerStatus();
- containersStatuses.add(containerStatus);
- ++numActiveContainers;
+ containerStatuses.add(containerStatus);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending out status for container: " + containerStatus);
}
@@ -356,26 +374,7 @@ public class NodeStatusUpdaterImpl exten
LOG.info("Removed completed container " + containerId);
}
}
- nodeStatus.setContainersStatuses(containersStatuses);
-
- LOG.debug(this.nodeId + " sending out status for "
- + numActiveContainers + " containers");
-
- NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
- nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
- nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
- nodeHealthStatus.setLastHealthReportTime(
- healthChecker.getLastHealthReportTime());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
- + ", " + nodeHealthStatus.getHealthReport());
- }
- nodeStatus.setNodeHealthStatus(nodeHealthStatus);
-
- List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
- nodeStatus.setKeepAliveApplications(keepAliveAppIds);
-
- return nodeStatus;
+ return containerStatuses;
}
private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@@ -458,18 +457,15 @@ public class NodeStatusUpdaterImpl exten
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
- NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext();
- nodeStatus.setResponseId(lastHeartBeatID);
+ NodeStatus nodeStatus =
+ getNodeStatusAndUpdateContainersInContext(lastHeartBeatID);
- NodeHeartbeatRequest request = recordFactory
- .newRecordInstance(NodeHeartbeatRequest.class);
- request.setNodeStatus(nodeStatus);
- request
- .setLastKnownContainerTokenMasterKey(NodeStatusUpdaterImpl.this.context
- .getContainerTokenSecretManager().getCurrentKey());
- request
- .setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
- .getNMTokenSecretManager().getCurrentKey());
+ NodeHeartbeatRequest request =
+ NodeHeartbeatRequest.newInstance(nodeStatus,
+ NodeStatusUpdaterImpl.this.context
+ .getContainerTokenSecretManager().getCurrentKey(),
+ NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
+ .getCurrentKey());
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue Nov 19 05:17:20 2013
@@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -64,6 +65,7 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SerializedException;
@@ -371,17 +373,31 @@ public class ContainerManagerImpl extend
this.handle(new CMgrCompletedContainersEvent(containerIds,
CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC));
- while (!containers.isEmpty()) {
- try {
- Thread.sleep(1000);
- nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
- } catch (InterruptedException ex) {
- LOG.warn("Interrupted while sleeping on container kill on resync", ex);
+
+ /*
+ * We will wait till all the containers change their state to COMPLETE. We
+ * will not remove the container statuses from nm context because these
+ * are used while re-registering node manager with resource manager.
+ */
+ boolean allContainersCompleted = false;
+ while (!containers.isEmpty() && !allContainersCompleted) {
+ allContainersCompleted = true;
+ for (Entry<ContainerId, Container> container : containers.entrySet()) {
+ if (((ContainerImpl) container.getValue()).getCurrentState()
+ != ContainerState.COMPLETE) {
+ allContainersCompleted = false;
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ LOG.warn("Interrupted while sleeping on container kill on resync",
+ ex);
+ }
+ break;
+ }
}
}
-
// All containers killed
- if (containers.isEmpty()) {
+ if (allContainersCompleted) {
LOG.info("All containers in DONE state");
} else {
LOG.info("Done waiting for containers to be killed. Still alive: " +
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Tue Nov 19 05:17:20 2013
@@ -302,7 +302,7 @@ public class ContainerImpl implements Co
private final StateMachine<ContainerState, ContainerEventType, ContainerEvent>
stateMachine;
- private org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
+ public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
switch (stateMachine.getCurrentState()) {
case NEW:
case LOCALIZING:
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Tue Nov 19 05:17:20 2013
@@ -29,6 +29,10 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
@@ -46,14 +50,17 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.util.RackResolver;
@@ -183,6 +190,33 @@ public class ResourceTrackerService exte
Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion();
+ if (!request.getContainerStatuses().isEmpty()) {
+ LOG.info("received container statuses on node manager register :"
+ + request.getContainerStatuses());
+ for (ContainerStatus containerStatus : request.getContainerStatuses()) {
+ ApplicationAttemptId appAttemptId =
+ containerStatus.getContainerId().getApplicationAttemptId();
+ RMApp rmApp =
+ rmContext.getRMApps().get(appAttemptId.getApplicationId());
+ if (rmApp != null) {
+ RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
+ if (rmAppAttempt.getMasterContainer().getId()
+ .equals(containerStatus.getContainerId())
+ && containerStatus.getState() == ContainerState.COMPLETE) {
+ // sending master container finished event.
+ RMAppAttemptContainerFinishedEvent evt =
+ new RMAppAttemptContainerFinishedEvent(appAttemptId,
+ containerStatus);
+ rmContext.getDispatcher().getEventHandler().handle(evt);
+ }
+ } else {
+ LOG.error("Received finished container :"
+ + containerStatus.getContainerId()
+ + " for non existing application :"
+ + appAttemptId.getApplicationId());
+ }
+ }
+ }
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Tue Nov 19 05:17:20 2013
@@ -132,8 +132,8 @@ public class RMAppImpl implements RMApp,
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppSavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
- RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED,
- RMAppState.FINAL_SAVING),
+ RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED,
+ RMAppState.KILLED, RMAppState.FINAL_SAVING),
RMAppEventType.RECOVER, new RMAppRecoveredTransition())
.addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL,
new FinalSavingTransition(
@@ -611,11 +611,11 @@ public class RMAppImpl implements RMApp,
this.diagnostics.append(appState.getDiagnostics());
this.storedFinishTime = appState.getFinishTime();
this.startTime = appState.getStartTime();
+
for(int i=0; i<appState.getAttemptCount(); ++i) {
// create attempt
createNewAttempt(false);
- // recover attempt
- ((RMAppAttemptImpl) currentAttempt).recover(state);
+ ((RMAppAttemptImpl)this.currentAttempt).recover(state);
}
}
@@ -656,30 +656,35 @@ public class RMAppImpl implements RMApp,
};
}
+ @SuppressWarnings("unchecked")
private static final class RMAppRecoveredTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
+
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
- if (app.recoveredFinalState != null) {
- FINAL_TRANSITION.transition(app, event);
- return app.recoveredFinalState;
- }
- // Directly call AttemptFailedTransition, since now we deem that an
- // application fails because of RM restart as a normal AM failure.
-
- // Do not recover unmanaged applications since current recovery
- // mechanism of restarting attempts does not work for them.
- // This will need to be changed in work preserving recovery in which
- // RM will re-connect with the running AM's instead of restarting them
-
- // In work-preserve restart, if attemptCount == maxAttempts, the job still
- // needs to be recovered because the last attempt may still be running.
-
- // As part of YARN-1210, we may return ACCECPTED state waiting for AM to
- // reregister or fail and remove the following code.
- return new AttemptFailedTransition(RMAppState.SUBMITTED).transition(app,
- event);
+ if (app.attempts.isEmpty()) {
+ // Saved application was not running any attempts.
+ app.createNewAttempt(true);
+ return RMAppState.SUBMITTED;
+ } else {
+ /*
+ * If last attempt recovered final state is null .. it means attempt
+ * was started but AM container may or may not have started / finished.
+ * Therefore we should wait for it to finish.
+ */
+ for (RMAppAttempt attempt : app.getAppAttempts().values()) {
+ app.dispatcher.getEventHandler().handle(
+ new RMAppAttemptEvent(attempt.getAppAttemptId(),
+ RMAppAttemptEventType.RECOVER));
+ }
+ if (app.recoveredFinalState != null) {
+ FINAL_TRANSITION.transition(app, event);
+ return app.recoveredFinalState;
+ } else {
+ return RMAppState.RUNNING;
+ }
+ }
}
}
@@ -1017,4 +1022,10 @@ public class RMAppImpl implements RMApp,
throw new YarnRuntimeException("Unknown state passed!");
}
}
+
+ public static boolean isAppInFinalState(RMApp rmApp) {
+ RMAppState appState = rmApp.getState();
+ return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
+ || appState == RMAppState.KILLED;
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Tue Nov 19 05:17:20 2013
@@ -68,11 +68,14 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
@@ -179,7 +182,7 @@ public class RMAppAttemptImpl implements
new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
.addTransition( RMAppAttemptState.NEW,
EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED,
- RMAppAttemptState.FAILED, RMAppAttemptState.RECOVERED),
+ RMAppAttemptState.FAILED, RMAppAttemptState.LAUNCHED),
RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
// Transitions from SUBMITTED state
@@ -386,25 +389,6 @@ public class RMAppAttemptImpl implements
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.STATUS_UPDATE))
-
- // Transitions from RECOVERED State
- .addTransition(
- RMAppAttemptState.RECOVERED,
- RMAppAttemptState.RECOVERED,
- EnumSet.of(RMAppAttemptEventType.START,
- RMAppAttemptEventType.APP_ACCEPTED,
- RMAppAttemptEventType.APP_REJECTED,
- RMAppAttemptEventType.EXPIRE,
- RMAppAttemptEventType.LAUNCHED,
- RMAppAttemptEventType.LAUNCH_FAILED,
- RMAppAttemptEventType.REGISTERED,
- RMAppAttemptEventType.CONTAINER_ALLOCATED,
- RMAppAttemptEventType.CONTAINER_ACQUIRED,
- RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
- RMAppAttemptEventType.CONTAINER_FINISHED,
- RMAppAttemptEventType.UNREGISTERED,
- RMAppAttemptEventType.KILL,
- RMAppAttemptEventType.STATUS_UPDATE))
.installTopology();
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
@@ -694,8 +678,6 @@ public class RMAppAttemptImpl implements
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
this.finalStatus = attemptState.getFinalApplicationStatus();
this.startTime = attemptState.getStartTime();
- handle(new RMAppAttemptEvent(getAppAttemptId(),
- RMAppAttemptEventType.RECOVER));
}
private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
@@ -865,11 +847,38 @@ public class RMAppAttemptImpl implements
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
+ LOG.info("Recovering attempt : recoverdFinalState :"
+ + appAttempt.recoveredFinalState);
if (appAttempt.recoveredFinalState != null) {
appAttempt.progress = 1.0f;
+ RMApp rmApp =appAttempt.rmContext.getRMApps().get(
+ appAttempt.getAppAttemptId().getApplicationId());
+ // We will replay the final attempt only if last attempt is in final
+ // state but application is not in final state.
+ if (rmApp.getCurrentAppAttempt() == appAttempt
+ && !RMAppImpl.isAppInFinalState(rmApp)) {
+ (new BaseFinalTransition(appAttempt.recoveredFinalState)).transition(
+ appAttempt, event);
+ }
return appAttempt.recoveredFinalState;
} else {
- return RMAppAttemptState.RECOVERED;
+ /*
+ * Since the application attempt's final state is not saved that means
+ * for AM container (previous attempt) state must be one of these.
+ * 1) AM container may not have been launched (RM failed right before
+ * this).
+ * 2) AM container was successfully launched but may or may not have
+ * registered / unregistered.
+ * In whichever case we will wait (by moving attempt into LAUNCHED
+ * state) and mark this attempt failed (assuming non work preserving
+ * restart) only after
+ * 1) Node manager during re-registration heart beats back saying
+ * am container finished.
+ * 2) OR AMLivelinessMonitor expires this attempt (when am doesn't
+ * heart beat back).
+ */
+ (new AMLaunchedTransition()).transition(appAttempt, event);
+ return RMAppAttemptState.LAUNCHED;
}
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java Tue Nov 19 05:17:20 2013
@@ -20,6 +20,5 @@ package org.apache.hadoop.yarn.server.re
public enum RMAppAttemptState {
NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING,
- FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED,
- FINAL_SAVING
+ FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, FINAL_SAVING
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java Tue Nov 19 05:17:20 2013
@@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -31,6 +32,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
@@ -98,21 +100,27 @@ public class MockNM {
}
public RegisterNodeManagerResponse registerNode() throws Exception {
+ return registerNode(null);
+ }
+
+ public RegisterNodeManagerResponse registerNode(
+ List<ContainerStatus> containerStatus) throws Exception{
RegisterNodeManagerRequest req = Records.newRecord(
RegisterNodeManagerRequest.class);
req.setNodeId(nodeId);
req.setHttpPort(httpPort);
Resource resource = BuilderUtils.newResource(memory, vCores);
req.setResource(resource);
+ req.setContainerStatuses(containerStatus);
req.setNMVersion(version);
RegisterNodeManagerResponse registrationResponse =
resourceTracker.registerNodeManager(req);
this.currentContainerTokenMasterKey =
registrationResponse.getContainerTokenMasterKey();
this.currentNMTokenMasterKey = registrationResponse.getNMTokenMasterKey();
- return registrationResponse;
+ return registrationResponse;
}
-
+
public NodeHeartbeatResponse nodeHeartbeat(boolean isHealthy) throws Exception {
return nodeHeartbeat(new HashMap<ApplicationId, List<ContainerStatus>>(),
isHealthy, ++responseId);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java Tue Nov 19 05:17:20 2013
@@ -34,6 +34,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
@@ -62,10 +63,14 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AsyncDispatcher;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
@@ -88,6 +93,7 @@ import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.mortbay.log.Log;
public class TestRMRestart {
@@ -109,6 +115,7 @@ public class TestRMRestart {
Assert.assertTrue(YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS > 1);
}
+ @SuppressWarnings("rawtypes")
@Test (timeout=180000)
public void testRMRestart() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
@@ -257,11 +264,14 @@ public class TestRMRestart {
.getApplicationId());
// verify state machine kicked into expected states
- rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
+ rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.RUNNING);
rm2.waitForState(loadedApp2.getApplicationId(), RMAppState.ACCEPTED);
- // verify new attempts created
- Assert.assertEquals(2, loadedApp1.getAppAttempts().size());
+ // verify attempts for apps
+ // The app for which AM was started will wait for previous am
+ // container finish event to arrive. However for an application for which
+ // no am container was running will start new application attempt.
+ Assert.assertEquals(1, loadedApp1.getAppAttempts().size());
Assert.assertEquals(1, loadedApp2.getAppAttempts().size());
// verify old AM is not accepted
@@ -279,8 +289,20 @@ public class TestRMRestart {
Assert.assertEquals(NodeAction.RESYNC, hbResponse.getNodeAction());
// new NM to represent NM re-register
- nm1 = rm2.registerNode("127.0.0.1:1234", 15120);
- nm2 = rm2.registerNode("127.0.0.2:5678", 15120);
+ nm1 = new MockNM("127.0.0.1:1234", 15120, rm2.getResourceTrackerService());
+ nm2 = new MockNM("127.0.0.2:5678", 15120, rm2.getResourceTrackerService());
+
+ List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
+ ContainerStatus containerStatus =
+ BuilderUtils.newContainerStatus(BuilderUtils.newContainerId(loadedApp1
+ .getCurrentAppAttempt().getAppAttemptId(), 1),
+ ContainerState.COMPLETE, "Killed AM container", 143);
+ containerStatuses.add(containerStatus);
+ nm1.registerNode(containerStatuses);
+ nm2.registerNode();
+
+ rm2.waitForState(loadedApp1.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(2, loadedApp1.getAppAttempts().size());
// verify no more reboot response sent
hbResponse = nm1.nodeHeartbeat(true);
@@ -404,6 +426,157 @@ public class TestRMRestart {
}
@Test
+ public void testRMRestartWaitForPreviousAMToFinish() throws Exception {
+ // testing 3 cases
+ // After RM restarts
+ // 1) New application attempt is not started until previous AM container
+ // finish event is reported back to RM as a part of nm registration.
+ // 2) If previous AM container finish event is never reported back (i.e.
+ // node manager on which this AM container was running also went down) in
+ // that case AMLivenessMonitor should time out previous attempt and start
+ // new attempt.
+ // 3) If all the stored attempts had finished then new attempt should
+ // be started immediately.
+ YarnConfiguration conf = new YarnConfiguration(this.conf);
+ conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 40);
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ RMState rmState = memStore.getState();
+ Map<ApplicationId, ApplicationState> rmAppState =
+ rmState.getApplicationState();
+
+ // start RM
+ final MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234" , 16382, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // submitting app
+ RMApp app1 = rm1.submitApp(200);
+ rm1.waitForState(app1.getApplicationId(), RMAppState.ACCEPTED);
+ MockAM am1 = launchAM(app1, rm1, nm1);
+ nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE);
+ // Fail first AM.
+ am1.waitForState(RMAppAttemptState.FAILED);
+
+ // launch another AM.
+ MockAM am2 = launchAM(app1, rm1, nm1);
+
+ Assert.assertEquals(1, rmAppState.size());
+ Assert.assertEquals(app1.getState(), RMAppState.RUNNING);
+ Assert.assertEquals(app1.getAppAttempts()
+ .get(app1.getCurrentAppAttempt().getAppAttemptId())
+ .getAppAttemptState(), RMAppAttemptState.RUNNING);
+
+ // start new RM.
+ MockRM rm2 = null;
+ rm2 = new MockRM(conf, memStore);
+ rm2.start();
+
+ nm1.setResourceTrackerService(rm2.getResourceTrackerService());
+ NodeHeartbeatResponse res = nm1.nodeHeartbeat(true);
+ Assert.assertEquals(NodeAction.RESYNC, res.getNodeAction());
+
+ RMApp rmApp = rm2.getRMContext().getRMApps().get(app1.getApplicationId());
+ // application should be in running state
+ rm2.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+
+ Assert.assertEquals(RMAppState.RUNNING, rmApp.getState());
+ // new attempt should not be started
+ Assert.assertEquals(2, rmApp.getAppAttempts().size());
+ // am1 attempt should be in FAILED state where as am2 attempt should be in
+ // LAUNCHED state
+ Assert.assertEquals(RMAppAttemptState.FAILED,
+ rmApp.getAppAttempts().get(am1.getApplicationAttemptId())
+ .getAppAttemptState());
+ Assert.assertEquals(RMAppAttemptState.LAUNCHED,
+ rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
+ .getAppAttemptState());
+
+ List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
+ ContainerStatus containerStatus =
+ BuilderUtils.newContainerStatus(
+ BuilderUtils.newContainerId(am2.getApplicationAttemptId(), 1),
+ ContainerState.COMPLETE, "Killed AM container", 143);
+ containerStatuses.add(containerStatus);
+ nm1.registerNode(containerStatuses);
+ rm2.waitForState(am2.getApplicationAttemptId(), RMAppAttemptState.FAILED);
+ launchAM(rmApp, rm2, nm1);
+ Assert.assertEquals(3, rmApp.getAppAttempts().size());
+ rm2.waitForState(rmApp.getCurrentAppAttempt().getAppAttemptId(),
+ RMAppAttemptState.RUNNING);
+ // Now restart RM ...
+ // Setting AMLivelinessMonitor interval to be 10 Secs.
+ conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
+ MockRM rm3 = null;
+ rm3 = new MockRM(conf, memStore);
+ rm3.start();
+
+ // Wait for RM to process all the events as a part of rm recovery.
+ nm1.setResourceTrackerService(rm3.getResourceTrackerService());
+
+ rmApp = rm3.getRMContext().getRMApps().get(app1.getApplicationId());
+ // application should be in running state
+ rm3.waitForState(app1.getApplicationId(), RMAppState.RUNNING);
+ Assert.assertEquals(rmApp.getState(), RMAppState.RUNNING);
+ // new attempt should not be started
+ Assert.assertEquals(3, rmApp.getAppAttempts().size());
+ // am1 and am2 attempts should be in FAILED state where as am3 should be
+ // in LAUNCHED state
+ Assert.assertEquals(RMAppAttemptState.FAILED,
+ rmApp.getAppAttempts().get(am1.getApplicationAttemptId())
+ .getAppAttemptState());
+ Assert.assertEquals(RMAppAttemptState.FAILED,
+ rmApp.getAppAttempts().get(am2.getApplicationAttemptId())
+ .getAppAttemptState());
+ ApplicationAttemptId latestAppAttemptId =
+ rmApp.getCurrentAppAttempt().getAppAttemptId();
+ Assert.assertEquals(RMAppAttemptState.LAUNCHED,rmApp.getAppAttempts()
+ .get(latestAppAttemptId).getAppAttemptState());
+
+ rm3.waitForState(latestAppAttemptId, RMAppAttemptState.FAILED);
+ rm3.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(4, rmApp.getAppAttempts().size());
+ Assert.assertEquals(RMAppAttemptState.FAILED,
+ rmApp.getAppAttempts().get(latestAppAttemptId).getAppAttemptState());
+
+ latestAppAttemptId = rmApp.getCurrentAppAttempt().getAppAttemptId();
+
+ // The 4th attempt has started but is not yet saved into RMStateStore
+ // It will be saved only when we launch AM.
+
+ // submitting app but not starting AM for it.
+ RMApp app2 = rm3.submitApp(200);
+ rm3.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(1, app2.getAppAttempts().size());
+ Assert.assertEquals(0,
+ memStore.getState().getApplicationState().get(app2.getApplicationId())
+ .getAttemptCount());
+
+ MockRM rm4 = null;
+ rm4 = new MockRM(conf, memStore);
+ rm4.start();
+
+ rmApp = rm4.getRMContext().getRMApps().get(app1.getApplicationId());
+ rm4.waitForState(rmApp.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(4, rmApp.getAppAttempts().size());
+ Assert.assertEquals(RMAppState.ACCEPTED, rmApp.getState());
+ Assert.assertEquals(RMAppAttemptState.SCHEDULED, rmApp.getAppAttempts()
+ .get(latestAppAttemptId).getAppAttemptState());
+
+ // The initial application for which an AM was not started should be in
+ // ACCEPTED state with one application attempt started.
+ app2 = rm4.getRMContext().getRMApps().get(app2.getApplicationId());
+ rm4.waitForState(app2.getApplicationId(), RMAppState.ACCEPTED);
+ Assert.assertEquals(RMAppState.ACCEPTED, app2.getState());
+ Assert.assertEquals(1, app2.getAppAttempts().size());
+ Assert.assertEquals(RMAppAttemptState.SCHEDULED, app2
+ .getCurrentAppAttempt().getAppAttemptState());
+
+ }
+
+ @Test
public void testRMRestartFailedApp() throws Exception {
conf.setInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, 1);
MemoryRMStateStore memStore = new MemoryRMStateStore();
@@ -736,6 +909,8 @@ public class TestRMRestart {
Assert.assertEquals(BuilderUtils.newContainerId(attemptId1, 1),
attemptState.getMasterContainer().getId());
+ // Setting AMLivelinessMonitor interval to be 10 Secs.
+ conf.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 10000);
// start new RM
MockRM rm2 = new MockRM(conf, memStore);
rm2.start();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java?rev=1543310&r1=1543309&r2=1543310&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java Tue Nov 19 05:17:20 2013
@@ -372,10 +372,10 @@ public class TestRMAppAttemptTransitions
}
/**
- * {@link RMAppAttemptState#RECOVERED}
+ * {@link RMAppAttemptState#LAUNCHED}
*/
private void testAppAttemptRecoveredState() {
- assertEquals(RMAppAttemptState.RECOVERED,
+ assertEquals(RMAppAttemptState.LAUNCHED,
applicationAttempt.getAppAttemptState());
}