You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ar...@apache.org on 2013/11/19 18:26:26 UTC
svn commit: r1543510 [1/2] - in
/hadoop/common/branches/HDFS-2832/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/ya...
Author: arp
Date: Tue Nov 19 17:26:23 2013
New Revision: 1543510
URL: http://svn.apache.org/r1543510
Log:
Merging r1543111 through r1543509 from trunk to branch HDFS-2832
Added:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/SchedulerPageUtil.java
- copied unchanged from r1543509, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/SchedulerPageUtil.java
Modified:
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/FairSchedulerPage.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockNM.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRMRestart.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/TestRMAppAttemptTransitions.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/CHANGES.txt Tue Nov 19 17:26:23 2013
@@ -100,6 +100,20 @@ Release 2.3.0 - UNRELEASED
YARN-1222. Make improvements in ZKRMStateStore for fencing (Karthik
Kambatla via bikas)
+ YARN-709. Added tests to verify validity of delegation tokens and logging of
+ appsummary after RM restart. (Jian He via vinodkv)
+
+ YARN-1210. Changed RM to start new app-attempts on RM restart only after
+ ensuring that previous AM exited or after expiry time. (Omkar Vinit Joshi via
+ vinodkv)
+
+ YARN-674. Fixed ResourceManager to renew DelegationTokens on submission
+ asynchronously to work around potential slowness in state-store. (Omkar Vinit
+ Joshi via vinodkv)
+
+ YARN-584. In scheduler web UIs, queues unexpand on refresh. (Harshit
+ Daga via Sandy Ryza)
+
OPTIMIZATIONS
BUG FIXES
@@ -139,6 +153,9 @@ Release 2.3.0 - UNRELEASED
YARN-1411. HA config shouldn't affect NodeManager RPC addresses (Karthik
Kambatla via bikas)
+ YARN-1419. TestFifoScheduler.testAppAttemptMetrics fails intermittently
+ under jdk7 (Jonathan Eagles via jlowe)
+
Release 2.2.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Tue Nov 19 17:26:23 2013
@@ -504,6 +504,11 @@ public class YarnConfiguration extends C
RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
30000l;
+
+ /** Delegation Token renewer thread count */
+ public static final String RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT =
+ RM_PREFIX + "delegation-token-renewer.thread-count";
+ public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50;
/** Whether to enable log aggregation */
public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatRequest.java Tue Nov 19 17:26:23 2013
@@ -20,15 +20,29 @@ package org.apache.hadoop.yarn.server.ap
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.util.Records;
-public interface NodeHeartbeatRequest {
+public abstract class NodeHeartbeatRequest {
+
+ public static NodeHeartbeatRequest newInstance(NodeStatus nodeStatus,
+ MasterKey lastKnownContainerTokenMasterKey,
+ MasterKey lastKnownNMTokenMasterKey) {
+ NodeHeartbeatRequest nodeHeartbeatRequest =
+ Records.newRecord(NodeHeartbeatRequest.class);
+ nodeHeartbeatRequest.setNodeStatus(nodeStatus);
+ nodeHeartbeatRequest
+ .setLastKnownContainerTokenMasterKey(lastKnownContainerTokenMasterKey);
+ nodeHeartbeatRequest
+ .setLastKnownNMTokenMasterKey(lastKnownNMTokenMasterKey);
+ return nodeHeartbeatRequest;
+ }
- NodeStatus getNodeStatus();
- void setNodeStatus(NodeStatus status);
+ public abstract NodeStatus getNodeStatus();
+ public abstract void setNodeStatus(NodeStatus status);
- MasterKey getLastKnownContainerTokenMasterKey();
- void setLastKnownContainerTokenMasterKey(MasterKey secretKey);
+ public abstract MasterKey getLastKnownContainerTokenMasterKey();
+ public abstract void setLastKnownContainerTokenMasterKey(MasterKey secretKey);
- MasterKey getLastKnownNMTokenMasterKey();
- void setLastKnownNMTokenMasterKey(MasterKey secretKey);
+ public abstract MasterKey getLastKnownNMTokenMasterKey();
+ public abstract void setLastKnownNMTokenMasterKey(MasterKey secretKey);
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java Tue Nov 19 17:26:23 2013
@@ -18,17 +18,37 @@
package org.apache.hadoop.yarn.server.api.protocolrecords;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.Records;
-public interface RegisterNodeManagerRequest {
- NodeId getNodeId();
- int getHttpPort();
- Resource getResource();
- String getNMVersion();
+public abstract class RegisterNodeManagerRequest {
+
+ public static RegisterNodeManagerRequest newInstance(NodeId nodeId,
+ int httpPort, Resource resource, String nodeManagerVersionId,
+ List<ContainerStatus> containerStatuses) {
+ RegisterNodeManagerRequest request =
+ Records.newRecord(RegisterNodeManagerRequest.class);
+ request.setHttpPort(httpPort);
+ request.setResource(resource);
+ request.setNodeId(nodeId);
+ request.setNMVersion(nodeManagerVersionId);
+ request.setContainerStatuses(containerStatuses);
+ return request;
+ }
+
+ public abstract NodeId getNodeId();
+ public abstract int getHttpPort();
+ public abstract Resource getResource();
+ public abstract String getNMVersion();
+ public abstract List<ContainerStatus> getContainerStatuses();
- void setNodeId(NodeId nodeId);
- void setHttpPort(int port);
- void setResource(Resource resource);
- void setNMVersion(String version);
+ public abstract void setNodeId(NodeId nodeId);
+ public abstract void setHttpPort(int port);
+ public abstract void setResource(Resource resource);
+ public abstract void setNMVersion(String version);
+ public abstract void setContainerStatuses(List<ContainerStatus> containerStatuses);
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatRequestPBImpl.java Tue Nov 19 17:26:23 2013
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
@@ -29,8 +28,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
import org.apache.hadoop.yarn.server.api.records.impl.pb.NodeStatusPBImpl;
-public class NodeHeartbeatRequestPBImpl extends
- ProtoBase<NodeHeartbeatRequestProto> implements NodeHeartbeatRequest {
+public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
NodeHeartbeatRequestProto proto = NodeHeartbeatRequestProto.getDefaultInstance();
NodeHeartbeatRequestProto.Builder builder = null;
boolean viaProto = false;
@@ -55,6 +53,21 @@ public class NodeHeartbeatRequestPBImpl
return proto;
}
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
private void mergeLocalToBuilder() {
if (this.nodeStatus != null) {
builder.setNodeStatus(convertToProtoFormat(this.nodeStatus));
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java Tue Nov 19 17:26:23 2013
@@ -19,11 +19,21 @@
package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
@@ -32,13 +42,14 @@ import org.apache.hadoop.yarn.server.api
-public class RegisterNodeManagerRequestPBImpl extends ProtoBase<RegisterNodeManagerRequestProto> implements RegisterNodeManagerRequest {
+public class RegisterNodeManagerRequestPBImpl extends RegisterNodeManagerRequest {
RegisterNodeManagerRequestProto proto = RegisterNodeManagerRequestProto.getDefaultInstance();
RegisterNodeManagerRequestProto.Builder builder = null;
boolean viaProto = false;
private Resource resource = null;
private NodeId nodeId = null;
+ private List<ContainerStatus> containerStatuses = null;
public RegisterNodeManagerRequestPBImpl() {
builder = RegisterNodeManagerRequestProto.newBuilder();
@@ -57,6 +68,9 @@ public class RegisterNodeManagerRequestP
}
private void mergeLocalToBuilder() {
+ if (this.containerStatuses != null) {
+ addContainerStatusesToProto();
+ }
if (this.resource != null) {
builder.setResource(convertToProtoFormat(this.resource));
}
@@ -140,6 +154,81 @@ public class RegisterNodeManagerRequestP
}
@Override
+ public List<ContainerStatus> getContainerStatuses() {
+ initContainerStatuses();
+ return containerStatuses;
+ }
+
+ private void initContainerStatuses() {
+ if (this.containerStatuses != null) {
+ return;
+ }
+ RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
+ List<ContainerStatusProto> list = p.getContainerStatusesList();
+ this.containerStatuses = new ArrayList<ContainerStatus>();
+ for (ContainerStatusProto c : list) {
+ this.containerStatuses.add(convertFromProtoFormat(c));
+ }
+ }
+
+ @Override
+ public void setContainerStatuses(List<ContainerStatus> containers) {
+ if (containers == null) {
+ return;
+ }
+ initContainerStatuses();
+ this.containerStatuses.addAll(containers);
+ }
+
+ private void addContainerStatusesToProto() {
+ maybeInitBuilder();
+ builder.clearContainerStatuses();
+ if (containerStatuses == null) {
+ return;
+ }
+ Iterable<ContainerStatusProto> it = new Iterable<ContainerStatusProto>() {
+
+ @Override
+ public Iterator<ContainerStatusProto> iterator() {
+ return new Iterator<ContainerStatusProto>() {
+ Iterator<ContainerStatus> iter = containerStatuses.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ContainerStatusProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+ builder.addAllContainerStatuses(it);
+ }
+
+ @Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
public String getNMVersion() {
RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
if (!p.hasNmVersion()) {
@@ -170,6 +259,11 @@ public class RegisterNodeManagerRequestP
return ((ResourcePBImpl)t).getProto();
}
-
-
-}
+ private ContainerStatusPBImpl convertFromProtoFormat(ContainerStatusProto c) {
+ return new ContainerStatusPBImpl(c);
+ }
+
+ private ContainerStatusProto convertToProtoFormat(ContainerStatus c) {
+ return ((ContainerStatusPBImpl)c).getProto();
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java Tue Nov 19 17:26:23 2013
@@ -22,10 +22,24 @@ import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.util.Records;
-public interface NodeStatus {
+public abstract class NodeStatus {
+ public static NodeStatus newInstance(NodeId nodeId, int responseId,
+ List<ContainerStatus> containerStatuses,
+ List<ApplicationId> keepAliveApplications,
+ NodeHealthStatus nodeHealthStatus) {
+ NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
+ nodeStatus.setResponseId(responseId);
+ nodeStatus.setNodeId(nodeId);
+ nodeStatus.setContainersStatuses(containerStatuses);
+ nodeStatus.setKeepAliveApplications(keepAliveApplications);
+ nodeStatus.setNodeHealthStatus(nodeHealthStatus);
+ return nodeStatus;
+ }
+
public abstract NodeId getNodeId();
public abstract int getResponseId();
@@ -36,8 +50,8 @@ public interface NodeStatus {
public abstract List<ApplicationId> getKeepAliveApplications();
public abstract void setKeepAliveApplications(List<ApplicationId> appIds);
- NodeHealthStatus getNodeHealthStatus();
- void setNodeHealthStatus(NodeHealthStatus healthStatus);
+ public abstract NodeHealthStatus getNodeHealthStatus();
+ public abstract void setNodeHealthStatus(NodeHealthStatus healthStatus);
public abstract void setNodeId(NodeId nodeId);
public abstract void setResponseId(int responseId);
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java Tue Nov 19 17:26:23 2013
@@ -29,7 +29,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@@ -40,8 +39,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
-public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements
- NodeStatus {
+public class NodeStatusPBImpl extends NodeStatus {
NodeStatusProto proto = NodeStatusProto.getDefaultInstance();
NodeStatusProto.Builder builder = null;
boolean viaProto = false;
@@ -167,6 +165,21 @@ public class NodeStatusPBImpl extends Pr
}
@Override
+ public int hashCode() {
+ return getProto().hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (other == null)
+ return false;
+ if (other.getClass().isAssignableFrom(this.getClass())) {
+ return this.getProto().equals(this.getClass().cast(other).getProto());
+ }
+ return false;
+ }
+
+ @Override
public synchronized int getResponseId() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
return p.getResponseId();
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto Tue Nov 19 17:26:23 2013
@@ -30,6 +30,7 @@ message RegisterNodeManagerRequestProto
optional int32 http_port = 3;
optional ResourceProto resource = 4;
optional string nm_version = 5;
+ repeated ContainerStatusProto containerStatuses = 6;
}
message RegisterNodeManagerResponseProto {
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdater.java Tue Nov 19 17:26:23 2013
@@ -26,7 +26,7 @@ public interface NodeStatusUpdater exten
void sendOutofBandHeartBeat();
- NodeStatus getNodeStatusAndUpdateContainersInContext();
+ NodeStatus getNodeStatusAndUpdateContainersInContext(int responseId);
long getRMIdentifier();
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Tue Nov 19 17:26:23 2013
@@ -48,8 +48,6 @@ import org.apache.hadoop.yarn.conf.YarnC
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.api.ResourceManagerConstants;
import org.apache.hadoop.yarn.server.api.ResourceTracker;
import org.apache.hadoop.yarn.server.api.ServerRMProxy;
@@ -89,7 +87,6 @@ public class NodeStatusUpdaterImpl exten
private String nodeManagerVersionId;
private String minimumResourceManagerVersion;
private volatile boolean isStopped;
- private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private boolean tokenKeepAliveEnabled;
private long tokenRemovalDelayMs;
/** Keeps track of when the next keep alive request should be sent for an app*/
@@ -134,9 +131,7 @@ public class NodeStatusUpdaterImpl exten
conf.getInt(
YarnConfiguration.NM_VCORES, YarnConfiguration.DEFAULT_NM_VCORES);
- this.totalResource = recordFactory.newRecordInstance(Resource.class);
- this.totalResource.setMemory(memoryMb);
- this.totalResource.setVirtualCores(virtualCores);
+ this.totalResource = Resource.newInstance(memoryMb, virtualCores);
metrics.addResource(totalResource);
this.tokenKeepAliveEnabled = isTokenKeepAliveEnabled(conf);
this.tokenRemovalDelayMs =
@@ -238,13 +233,17 @@ public class NodeStatusUpdaterImpl exten
}
@VisibleForTesting
- protected void registerWithRM() throws YarnException, IOException {
+ protected void registerWithRM()
+ throws YarnException, IOException {
+ List<ContainerStatus> containerStatuses =
+ this.updateAndGetContainerStatuses();
RegisterNodeManagerRequest request =
- recordFactory.newRecordInstance(RegisterNodeManagerRequest.class);
- request.setHttpPort(this.httpPort);
- request.setResource(this.totalResource);
- request.setNodeId(this.nodeId);
- request.setNMVersion(this.nodeManagerVersionId);
+ RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
+ nodeManagerVersionId, containerStatuses);
+ if (containerStatuses != null) {
+ LOG.info("Registering with RM using finished containers :"
+ + containerStatuses);
+ }
RegisterNodeManagerResponse regNMResponse =
resourceTracker.registerNodeManager(request);
this.rmIdentifier = regNMResponse.getRMIdentifier();
@@ -323,13 +322,33 @@ public class NodeStatusUpdaterImpl exten
}
@Override
- public NodeStatus getNodeStatusAndUpdateContainersInContext() {
+ public NodeStatus getNodeStatusAndUpdateContainersInContext(
+ int responseId) {
- NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
- nodeStatus.setNodeId(this.nodeId);
+ NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
+ nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
+ nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
+ nodeHealthStatus.setLastHealthReportTime(
+ healthChecker.getLastHealthReportTime());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
+ + ", " + nodeHealthStatus.getHealthReport());
+ }
+ List<ContainerStatus> containersStatuses = updateAndGetContainerStatuses();
+ LOG.debug(this.nodeId + " sending out status for "
+ + containersStatuses.size() + " containers");
+ NodeStatus nodeStatus = NodeStatus.newInstance(nodeId, responseId,
+ containersStatuses, createKeepAliveApplicationList(), nodeHealthStatus);
- int numActiveContainers = 0;
- List<ContainerStatus> containersStatuses = new ArrayList<ContainerStatus>();
+ return nodeStatus;
+ }
+
+ /*
+ * It will return current container statuses. If any container has
+ * COMPLETED then it will be removed from context.
+ */
+ private List<ContainerStatus> updateAndGetContainerStatuses() {
+ List<ContainerStatus> containerStatuses = new ArrayList<ContainerStatus>();
for (Iterator<Entry<ContainerId, Container>> i =
this.context.getContainers().entrySet().iterator(); i.hasNext();) {
Entry<ContainerId, Container> e = i.next();
@@ -339,8 +358,7 @@ public class NodeStatusUpdaterImpl exten
// Clone the container to send it to the RM
org.apache.hadoop.yarn.api.records.ContainerStatus containerStatus =
container.cloneAndGetContainerStatus();
- containersStatuses.add(containerStatus);
- ++numActiveContainers;
+ containerStatuses.add(containerStatus);
if (LOG.isDebugEnabled()) {
LOG.debug("Sending out status for container: " + containerStatus);
}
@@ -356,26 +374,7 @@ public class NodeStatusUpdaterImpl exten
LOG.info("Removed completed container " + containerId);
}
}
- nodeStatus.setContainersStatuses(containersStatuses);
-
- LOG.debug(this.nodeId + " sending out status for "
- + numActiveContainers + " containers");
-
- NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
- nodeHealthStatus.setHealthReport(healthChecker.getHealthReport());
- nodeHealthStatus.setIsNodeHealthy(healthChecker.isHealthy());
- nodeHealthStatus.setLastHealthReportTime(
- healthChecker.getLastHealthReportTime());
- if (LOG.isDebugEnabled()) {
- LOG.debug("Node's health-status : " + nodeHealthStatus.getIsNodeHealthy()
- + ", " + nodeHealthStatus.getHealthReport());
- }
- nodeStatus.setNodeHealthStatus(nodeHealthStatus);
-
- List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
- nodeStatus.setKeepAliveApplications(keepAliveAppIds);
-
- return nodeStatus;
+ return containerStatuses;
}
private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
@@ -458,18 +457,15 @@ public class NodeStatusUpdaterImpl exten
// Send heartbeat
try {
NodeHeartbeatResponse response = null;
- NodeStatus nodeStatus = getNodeStatusAndUpdateContainersInContext();
- nodeStatus.setResponseId(lastHeartBeatID);
+ NodeStatus nodeStatus =
+ getNodeStatusAndUpdateContainersInContext(lastHeartBeatID);
- NodeHeartbeatRequest request = recordFactory
- .newRecordInstance(NodeHeartbeatRequest.class);
- request.setNodeStatus(nodeStatus);
- request
- .setLastKnownContainerTokenMasterKey(NodeStatusUpdaterImpl.this.context
- .getContainerTokenSecretManager().getCurrentKey());
- request
- .setLastKnownNMTokenMasterKey(NodeStatusUpdaterImpl.this.context
- .getNMTokenSecretManager().getCurrentKey());
+ NodeHeartbeatRequest request =
+ NodeHeartbeatRequest.newInstance(nodeStatus,
+ NodeStatusUpdaterImpl.this.context
+ .getContainerTokenSecretManager().getCurrentKey(),
+ NodeStatusUpdaterImpl.this.context.getNMTokenSecretManager()
+ .getCurrentKey());
response = resourceTracker.nodeHeartbeat(request);
//get next heartbeat interval from response
nextHeartBeatInterval = response.getNextHeartBeatInterval();
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Tue Nov 19 17:26:23 2013
@@ -28,6 +28,7 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -64,6 +65,7 @@ import org.apache.hadoop.yarn.api.protoc
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SerializedException;
@@ -371,17 +373,31 @@ public class ContainerManagerImpl extend
this.handle(new CMgrCompletedContainersEvent(containerIds,
CMgrCompletedContainersEvent.Reason.ON_NODEMANAGER_RESYNC));
- while (!containers.isEmpty()) {
- try {
- Thread.sleep(1000);
- nodeStatusUpdater.getNodeStatusAndUpdateContainersInContext();
- } catch (InterruptedException ex) {
- LOG.warn("Interrupted while sleeping on container kill on resync", ex);
+
+ /*
+ * We will wait till all the containers change their state to COMPLETE. We
+ * will not remove the container statuses from nm context because these
+ * are used while re-registering node manager with resource manager.
+ */
+ boolean allContainersCompleted = false;
+ while (!containers.isEmpty() && !allContainersCompleted) {
+ allContainersCompleted = true;
+ for (Entry<ContainerId, Container> container : containers.entrySet()) {
+ if (((ContainerImpl) container.getValue()).getCurrentState()
+ != ContainerState.COMPLETE) {
+ allContainersCompleted = false;
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ LOG.warn("Interrupted while sleeping on container kill on resync",
+ ex);
+ }
+ break;
+ }
}
}
-
// All containers killed
- if (containers.isEmpty()) {
+ if (allContainersCompleted) {
LOG.info("All containers in DONE state");
} else {
LOG.info("Done waiting for containers to be killed. Still alive: " +
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/container/ContainerImpl.java Tue Nov 19 17:26:23 2013
@@ -302,7 +302,7 @@ public class ContainerImpl implements Co
private final StateMachine<ContainerState, ContainerEventType, ContainerEvent>
stateMachine;
- private org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
+ public org.apache.hadoop.yarn.api.records.ContainerState getCurrentState() {
switch (stateMachine.getCurrentState()) {
case NEW:
case LOCALIZING:
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java Tue Nov 19 17:26:23 2013
@@ -318,7 +318,7 @@ public class ClientRMService extends Abs
try {
// call RMAppManager to submit application directly
rmAppManager.submitApplication(submissionContext,
- System.currentTimeMillis(), false, user);
+ System.currentTimeMillis(), user, false, null);
LOG.info("Application with id " + applicationId.getId() +
" submitted by user " + user);
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Tue Nov 19 17:26:23 2013
@@ -47,6 +47,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -54,6 +55,8 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* This class manages the list of applications for the resource manager.
*/
@@ -165,6 +168,11 @@ public class RMAppManager implements Eve
}
}
+ @VisibleForTesting
+ public void logApplicationSummary(ApplicationId appId) {
+ ApplicationSummary.logAppSummary(rmContext.getRMApps().get(appId));
+ }
+
protected synchronized void setCompletedAppsMax(int max) {
this.completedAppsMax = max;
}
@@ -229,35 +237,63 @@ public class RMAppManager implements Eve
this.applicationACLsManager.removeApplication(removeId);
}
}
-
+
@SuppressWarnings("unchecked")
protected void submitApplication(
ApplicationSubmissionContext submissionContext, long submitTime,
- boolean isRecovered, String user) throws YarnException {
+ String user, boolean isRecovered, RMState state) throws YarnException {
ApplicationId applicationId = submissionContext.getApplicationId();
- // Validation of the ApplicationSubmissionContext needs to be completed
- // here. Only those fields that are dependent on RM's configuration are
- // checked here as they have to be validated whether they are part of new
- // submission or just being recovered.
+ RMAppImpl application =
+ createAndPopulateNewRMApp(submissionContext, submitTime, user);
- // Check whether AM resource requirements are within required limits
- if (!submissionContext.getUnmanagedAM()) {
- ResourceRequest amReq = BuilderUtils.newResourceRequest(
- RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
- submissionContext.getResource(), 1);
- try {
- SchedulerUtils.validateResourceRequest(amReq,
- scheduler.getMaximumResourceCapability());
- } catch (InvalidResourceRequestException e) {
- LOG.warn("RM app submission failed in validating AM resource request"
- + " for application " + applicationId, e);
- throw e;
+ if (isRecovered) {
+ recoverApplication(state, application);
+ RMAppState rmAppState =
+ state.getApplicationState().get(applicationId).getState();
+ if (isApplicationInFinalState(rmAppState)) {
+ // We are synchronously moving the application into final state so that
+ // momentarily client will not see this application in NEW state. Also
+ // for finished applications we will avoid renewing tokens.
+ application
+ .handle(new RMAppEvent(applicationId, RMAppEventType.RECOVER));
+ return;
}
}
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ Credentials credentials = null;
+ try {
+ credentials = parseCredentials(submissionContext);
+ } catch (Exception e) {
+ LOG.warn(
+ "Unable to parse credentials.", e);
+ // Sending APP_REJECTED is fine, since we assume that the
+ // RMApp is in NEW state and thus we haven't yet informed the
+ // scheduler about the existence of the application
+ assert application.getState() == RMAppState.NEW;
+ this.rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppRejectedEvent(applicationId, e.getMessage()));
+ throw RPCUtil.getRemoteException(e);
+ }
+ this.rmContext.getDelegationTokenRenewer().addApplication(
+ applicationId, credentials,
+ submissionContext.getCancelTokensWhenComplete(), isRecovered);
+ } else {
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(applicationId,
+ isRecovered ? RMAppEventType.RECOVER : RMAppEventType.START));
+ }
+ }
+ private RMAppImpl createAndPopulateNewRMApp(
+ ApplicationSubmissionContext submissionContext,
+ long submitTime, String user)
+ throws YarnException {
+ ApplicationId applicationId = submissionContext.getApplicationId();
+ validateResourceRequest(submissionContext);
// Create RMApp
- RMApp application =
+ RMAppImpl application =
new RMAppImpl(applicationId, rmContext, this.conf,
submissionContext.getApplicationName(), user,
submissionContext.getQueue(),
@@ -274,35 +310,52 @@ public class RMAppManager implements Eve
LOG.warn(message);
throw RPCUtil.getRemoteException(message);
}
-
// Inform the ACLs Manager
this.applicationACLsManager.addApplication(applicationId,
submissionContext.getAMContainerSpec().getApplicationACLs());
+ return application;
+ }
+
+ private void validateResourceRequest(
+ ApplicationSubmissionContext submissionContext)
+ throws InvalidResourceRequestException {
+ // Validation of the ApplicationSubmissionContext needs to be completed
+ // here. Only those fields that are dependent on RM's configuration are
+ // checked here as they have to be validated whether they are part of new
+ // submission or just being recovered.
+ // Check whether AM resource requirements are within required limits
+ if (!submissionContext.getUnmanagedAM()) {
+ ResourceRequest amReq = BuilderUtils.newResourceRequest(
+ RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
+ submissionContext.getResource(), 1);
+ try {
+ SchedulerUtils.validateResourceRequest(amReq,
+ scheduler.getMaximumResourceCapability());
+ } catch (InvalidResourceRequestException e) {
+ LOG.warn("RM app submission failed in validating AM resource request"
+ + " for application " + submissionContext.getApplicationId(), e);
+ throw e;
+ }
+ }
+ }
+
+ private void recoverApplication(RMState state, RMAppImpl application)
+ throws YarnException {
try {
- // Setup tokens for renewal
- if (UserGroupInformation.isSecurityEnabled()) {
- this.rmContext.getDelegationTokenRenewer().addApplication(
- applicationId,parseCredentials(submissionContext),
- submissionContext.getCancelTokensWhenComplete()
- );
- }
- } catch (IOException ie) {
- LOG.warn(
- "Unable to add the application to the delegation token renewer.",
- ie);
- // Sending APP_REJECTED is fine, since we assume that the
- // RMApp is in NEW state and thus we havne't yet informed the
- // Scheduler about the existence of the application
- this.rmContext.getDispatcher().getEventHandler().handle(
- new RMAppRejectedEvent(applicationId, ie.getMessage()));
- throw RPCUtil.getRemoteException(ie);
+ application.recover(state);
+ } catch (Exception e) {
+ LOG.error("Error recovering application", e);
+ throw new YarnException(e);
}
+ }
- if (!isRecovered) {
- // All done, start the RMApp
- this.rmContext.getDispatcher().getEventHandler()
- .handle(new RMAppEvent(applicationId, RMAppEventType.START));
+ private boolean isApplicationInFinalState(RMAppState rmAppState) {
+ if (rmAppState == RMAppState.FINISHED || rmAppState == RMAppState.FAILED
+ || rmAppState == RMAppState.KILLED) {
+ return true;
+ } else {
+ return false;
}
}
@@ -328,17 +381,9 @@ public class RMAppManager implements Eve
LOG.info("Recovering " + appStates.size() + " applications");
for (ApplicationState appState : appStates.values()) {
LOG.info("Recovering application " + appState.getAppId());
+
submitApplication(appState.getApplicationSubmissionContext(),
- appState.getSubmitTime(), true, appState.getUser());
- // re-populate attempt information in application
- RMAppImpl appImpl =
- (RMAppImpl) rmContext.getRMApps().get(appState.getAppId());
- appImpl.recover(state);
- // Recover the app synchronously, as otherwise client is possible to see
- // the application not recovered before it is actually recovered because
- // ClientRMService is already started at this point of time.
- appImpl.handle(new RMAppEvent(appImpl.getApplicationId(),
- RMAppEventType.RECOVER));
+ appState.getSubmitTime(), appState.getUser(), true, state);
}
}
@@ -351,8 +396,7 @@ public class RMAppManager implements Eve
case APP_COMPLETED:
{
finishApplication(applicationId);
- ApplicationSummary.logAppSummary(
- rmContext.getRMApps().get(applicationId));
+ logApplicationSummary(applicationId);
checkAppNumCompletedLimit();
}
break;
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Tue Nov 19 17:26:23 2013
@@ -29,6 +29,10 @@ import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.VersionUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceOption;
@@ -46,14 +50,17 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.records.MasterKey;
import org.apache.hadoop.yarn.server.api.records.NodeAction;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeReconnectEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeStatusEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
+import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.authorize.RMPolicyProvider;
import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
import org.apache.hadoop.yarn.util.RackResolver;
@@ -183,6 +190,33 @@ public class ResourceTrackerService exte
Resource capability = request.getResource();
String nodeManagerVersion = request.getNMVersion();
+ if (!request.getContainerStatuses().isEmpty()) {
+ LOG.info("received container statuses on node manager register :"
+ + request.getContainerStatuses());
+ for (ContainerStatus containerStatus : request.getContainerStatuses()) {
+ ApplicationAttemptId appAttemptId =
+ containerStatus.getContainerId().getApplicationAttemptId();
+ RMApp rmApp =
+ rmContext.getRMApps().get(appAttemptId.getApplicationId());
+ if (rmApp != null) {
+ RMAppAttempt rmAppAttempt = rmApp.getRMAppAttempt(appAttemptId);
+ if (rmAppAttempt.getMasterContainer().getId()
+ .equals(containerStatus.getContainerId())
+ && containerStatus.getState() == ContainerState.COMPLETE) {
+ // sending master container finished event.
+ RMAppAttemptContainerFinishedEvent evt =
+ new RMAppAttemptContainerFinishedEvent(appAttemptId,
+ containerStatus);
+ rmContext.getDispatcher().getEventHandler().handle(evt);
+ }
+ } else {
+ LOG.error("Received finished container :"
+ + containerStatus.getContainerId()
+ + " for non existing application :"
+ + appAttemptId.getApplicationId());
+ }
+ }
+ }
RegisterNodeManagerResponse response = recordFactory
.newRecordInstance(RegisterNodeManagerResponse.class);
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java Tue Nov 19 17:26:23 2013
@@ -132,8 +132,8 @@ public class RMAppImpl implements RMApp,
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppSavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
- RMAppState.FINISHED, RMAppState.FAILED, RMAppState.KILLED,
- RMAppState.FINAL_SAVING),
+ RMAppState.RUNNING, RMAppState.FINISHED, RMAppState.FAILED,
+ RMAppState.KILLED, RMAppState.FINAL_SAVING),
RMAppEventType.RECOVER, new RMAppRecoveredTransition())
.addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, RMAppEventType.KILL,
new FinalSavingTransition(
@@ -611,11 +611,11 @@ public class RMAppImpl implements RMApp,
this.diagnostics.append(appState.getDiagnostics());
this.storedFinishTime = appState.getFinishTime();
this.startTime = appState.getStartTime();
+
for(int i=0; i<appState.getAttemptCount(); ++i) {
// create attempt
createNewAttempt(false);
- // recover attempt
- ((RMAppAttemptImpl) currentAttempt).recover(state);
+ ((RMAppAttemptImpl)this.currentAttempt).recover(state);
}
}
@@ -656,30 +656,35 @@ public class RMAppImpl implements RMApp,
};
}
+ @SuppressWarnings("unchecked")
private static final class RMAppRecoveredTransition implements
MultipleArcTransition<RMAppImpl, RMAppEvent, RMAppState> {
+
@Override
public RMAppState transition(RMAppImpl app, RMAppEvent event) {
- if (app.recoveredFinalState != null) {
- FINAL_TRANSITION.transition(app, event);
- return app.recoveredFinalState;
- }
- // Directly call AttemptFailedTransition, since now we deem that an
- // application fails because of RM restart as a normal AM failure.
-
- // Do not recover unmanaged applications since current recovery
- // mechanism of restarting attempts does not work for them.
- // This will need to be changed in work preserving recovery in which
- // RM will re-connect with the running AM's instead of restarting them
-
- // In work-preserve restart, if attemptCount == maxAttempts, the job still
- // needs to be recovered because the last attempt may still be running.
-
- // As part of YARN-1210, we may return ACCECPTED state waiting for AM to
- // reregister or fail and remove the following code.
- return new AttemptFailedTransition(RMAppState.SUBMITTED).transition(app,
- event);
+ if (app.attempts.isEmpty()) {
+ // Saved application was not running any attempts.
+ app.createNewAttempt(true);
+ return RMAppState.SUBMITTED;
+ } else {
+ /*
+ * If last attempt recovered final state is null .. it means attempt
+ * was started but AM container may or may not have started / finished.
+ * Therefore we should wait for it to finish.
+ */
+ for (RMAppAttempt attempt : app.getAppAttempts().values()) {
+ app.dispatcher.getEventHandler().handle(
+ new RMAppAttemptEvent(attempt.getAppAttemptId(),
+ RMAppAttemptEventType.RECOVER));
+ }
+ if (app.recoveredFinalState != null) {
+ FINAL_TRANSITION.transition(app, event);
+ return app.recoveredFinalState;
+ } else {
+ return RMAppState.RUNNING;
+ }
+ }
}
}
@@ -1017,4 +1022,10 @@ public class RMAppImpl implements RMApp,
throw new YarnRuntimeException("Unknown state passed!");
}
}
+
+ public static boolean isAppInFinalState(RMApp rmApp) {
+ RMAppState appState = rmApp.getState();
+ return appState == RMAppState.FAILED || appState == RMAppState.FINISHED
+ || appState == RMAppState.KILLED;
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java Tue Nov 19 17:26:23 2013
@@ -68,11 +68,14 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.ApplicationState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFailedAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppFinishedAttemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.event.RMAppAttemptLaunchFailedEvent;
@@ -179,7 +182,7 @@ public class RMAppAttemptImpl implements
new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
.addTransition( RMAppAttemptState.NEW,
EnumSet.of(RMAppAttemptState.FINISHED, RMAppAttemptState.KILLED,
- RMAppAttemptState.FAILED, RMAppAttemptState.RECOVERED),
+ RMAppAttemptState.FAILED, RMAppAttemptState.LAUNCHED),
RMAppAttemptEventType.RECOVER, new AttemptRecoveredTransition())
// Transitions from SUBMITTED state
@@ -386,25 +389,6 @@ public class RMAppAttemptImpl implements
RMAppAttemptEventType.UNREGISTERED,
RMAppAttemptEventType.KILL,
RMAppAttemptEventType.STATUS_UPDATE))
-
- // Transitions from RECOVERED State
- .addTransition(
- RMAppAttemptState.RECOVERED,
- RMAppAttemptState.RECOVERED,
- EnumSet.of(RMAppAttemptEventType.START,
- RMAppAttemptEventType.APP_ACCEPTED,
- RMAppAttemptEventType.APP_REJECTED,
- RMAppAttemptEventType.EXPIRE,
- RMAppAttemptEventType.LAUNCHED,
- RMAppAttemptEventType.LAUNCH_FAILED,
- RMAppAttemptEventType.REGISTERED,
- RMAppAttemptEventType.CONTAINER_ALLOCATED,
- RMAppAttemptEventType.CONTAINER_ACQUIRED,
- RMAppAttemptEventType.ATTEMPT_NEW_SAVED,
- RMAppAttemptEventType.CONTAINER_FINISHED,
- RMAppAttemptEventType.UNREGISTERED,
- RMAppAttemptEventType.KILL,
- RMAppAttemptEventType.STATUS_UPDATE))
.installTopology();
public RMAppAttemptImpl(ApplicationAttemptId appAttemptId,
@@ -694,8 +678,6 @@ public class RMAppAttemptImpl implements
this.proxiedTrackingUrl = generateProxyUriWithScheme(originalTrackingUrl);
this.finalStatus = attemptState.getFinalApplicationStatus();
this.startTime = attemptState.getStartTime();
- handle(new RMAppAttemptEvent(getAppAttemptId(),
- RMAppAttemptEventType.RECOVER));
}
private void recoverAppAttemptCredentials(Credentials appAttemptTokens)
@@ -865,11 +847,38 @@ public class RMAppAttemptImpl implements
@Override
public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
RMAppAttemptEvent event) {
+ LOG.info("Recovering attempt : recoverdFinalState :"
+ + appAttempt.recoveredFinalState);
if (appAttempt.recoveredFinalState != null) {
appAttempt.progress = 1.0f;
+ RMApp rmApp =appAttempt.rmContext.getRMApps().get(
+ appAttempt.getAppAttemptId().getApplicationId());
+ // We will replay the final attempt only if last attempt is in final
+ // state but application is not in final state.
+ if (rmApp.getCurrentAppAttempt() == appAttempt
+ && !RMAppImpl.isAppInFinalState(rmApp)) {
+ (new BaseFinalTransition(appAttempt.recoveredFinalState)).transition(
+ appAttempt, event);
+ }
return appAttempt.recoveredFinalState;
} else {
- return RMAppAttemptState.RECOVERED;
+ /*
+ * Since the application attempt's final state is not saved that means
+ * for AM container (previous attempt) state must be one of these.
+ * 1) AM container may not have been launched (RM failed right before
+ * this).
+ * 2) AM container was successfully launched but may or may not have
+ * registered / unregistered.
+ * In whichever case we will wait (by moving attempt into LAUNCHED
+ * state) and mark this attempt failed (assuming non work preserving
+ * restart) only after
+ * 1) Node manager during re-registration heart beats back saying
+ * am container finished.
+ * 2) OR AMLivelinessMonitor expires this attempt (when am doesn't
+ * heart beat back).
+ */
+ (new AMLaunchedTransition()).transition(appAttempt, event);
+ return RMAppAttemptState.LAUNCHED;
}
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptState.java Tue Nov 19 17:26:23 2013
@@ -20,6 +20,5 @@ package org.apache.hadoop.yarn.server.re
public enum RMAppAttemptState {
NEW, SUBMITTED, SCHEDULED, ALLOCATED, LAUNCHED, FAILED, RUNNING, FINISHING,
- FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, RECOVERED,
- FINAL_SAVING
+ FINISHED, KILLED, ALLOCATED_SAVING, LAUNCHED_UNMANAGED_SAVING, FINAL_SAVING
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Tue Nov 19 17:26:23 2013
@@ -34,6 +34,10 @@ import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.logging.Log;
@@ -48,10 +52,15 @@ import org.apache.hadoop.service.Abstrac
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.AbstractEvent;
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Service to renew application delegation tokens.
@@ -72,7 +81,8 @@ public class DelegationTokenRenewer exte
// delegation token canceler thread
private DelegationTokenCancelThread dtCancelThread =
new DelegationTokenCancelThread();
-
+ private ThreadPoolExecutor renewerService;
+
// managing the list of tokens using Map
// appId=>List<tokens>
private Set<DelegationTokenToRenew> delegationTokens =
@@ -84,9 +94,9 @@ public class DelegationTokenRenewer exte
private long tokenRemovalDelayMs;
private Thread delayedRemovalThread;
- private boolean isServiceStarted = false;
- private List<DelegationTokenToRenew> pendingTokenForRenewal =
- new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
+ private ReadWriteLock serviceStateLock = new ReentrantReadWriteLock();
+ private volatile boolean isServiceStarted;
+ private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
private boolean tokenKeepAliveEnabled;
@@ -102,9 +112,27 @@ public class DelegationTokenRenewer exte
this.tokenRemovalDelayMs =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
+ renewerService = createNewThreadPoolService(conf);
+ pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
super.serviceInit(conf);
}
+ protected ThreadPoolExecutor createNewThreadPoolService(Configuration conf) {
+ int nThreads = conf.getInt(
+ YarnConfiguration.RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT,
+ YarnConfiguration.DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT);
+
+ ThreadFactory tf = new ThreadFactoryBuilder()
+ .setNameFormat("DelegationTokenRenewer #%d")
+ .build();
+ ThreadPoolExecutor pool =
+ new ThreadPoolExecutor((5 < nThreads ? 5 : nThreads), nThreads, 3L,
+ TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
+ pool.setThreadFactory(tf);
+ pool.allowCoreThreadTimeOut(true);
+ return pool;
+ }
+
@Override
protected void serviceStart() throws Exception {
dtCancelThread.start();
@@ -119,21 +147,36 @@ public class DelegationTokenRenewer exte
RMDelegationTokenIdentifier.Renewer.setSecretManager(
rmContext.getRMDelegationTokenSecretManager(),
rmContext.getClientRMService().getBindAddress());
- // Delegation token renewal is delayed until ClientRMService starts. As
- // it is required to short circuit the token renewal calls.
+ serviceStateLock.writeLock().lock();
isServiceStarted = true;
- renewIfServiceIsStarted(pendingTokenForRenewal);
- pendingTokenForRenewal.clear();
+ serviceStateLock.writeLock().unlock();
+ while(!pendingEventQueue.isEmpty()) {
+ processDelegationTokenRewewerEvent(pendingEventQueue.take());
+ }
super.serviceStart();
}
+ private void processDelegationTokenRewewerEvent(
+ DelegationTokenRenewerEvent evt) {
+ serviceStateLock.readLock().lock();
+ try {
+ if (isServiceStarted) {
+ renewerService.execute(new DelegationTokenRenewerRunnable(evt));
+ } else {
+ pendingEventQueue.add(evt);
+ }
+ } finally {
+ serviceStateLock.readLock().unlock();
+ }
+ }
+
@Override
protected void serviceStop() {
if (renewalTimer != null) {
renewalTimer.cancel();
}
delegationTokens.clear();
-
+ this.renewerService.shutdown();
dtCancelThread.interrupt();
try {
dtCancelThread.join(1000);
@@ -290,47 +333,50 @@ public class DelegationTokenRenewer exte
* @throws IOException
*/
public void addApplication(
- ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd)
+ ApplicationId applicationId, Credentials ts, boolean shouldCancelAtEnd,
+ boolean isApplicationRecovered) {
+ processDelegationTokenRewewerEvent(new DelegationTokenRenewerAppSubmitEvent(
+ applicationId, ts,
+ shouldCancelAtEnd, isApplicationRecovered));
+ }
+
+ private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
throws IOException {
+ ApplicationId applicationId = evt.getApplicationId();
+ Credentials ts = evt.getCredentials();
+ boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
if (ts == null) {
- return; //nothing to add
+ return; // nothing to add
}
-
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Registering tokens for renewal for:" +
+ LOG.debug("Registering tokens for renewal for:" +
" appId = " + applicationId);
}
-
- Collection <Token<?>> tokens = ts.getAllTokens();
+
+ Collection<Token<?>> tokens = ts.getAllTokens();
long now = System.currentTimeMillis();
-
+
// find tokens for renewal, but don't add timers until we know
// all renewable tokens are valid
// At RM restart it is safe to assume that all the previously added tokens
// are valid
List<DelegationTokenToRenew> tokenList =
new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
- for(Token<?> token : tokens) {
+ for (Token<?> token : tokens) {
if (token.isManaged()) {
tokenList.add(new DelegationTokenToRenew(applicationId,
token, getConfig(), now, shouldCancelAtEnd));
}
}
- if (!tokenList.isEmpty()){
- renewIfServiceIsStarted(tokenList);
- }
- }
-
- protected void renewIfServiceIsStarted(List<DelegationTokenToRenew> dtrs)
- throws IOException {
- if (isServiceStarted) {
+ if (!tokenList.isEmpty()) {
// Renewing token and adding it to timer calls are separated purposefully
// If user provides incorrect token then it should not be added for
// renewal.
- for (DelegationTokenToRenew dtr : dtrs) {
+ for (DelegationTokenToRenew dtr : tokenList) {
renewToken(dtr);
}
- for (DelegationTokenToRenew dtr : dtrs) {
+ for (DelegationTokenToRenew dtr : tokenList) {
addTokenToList(dtr);
setTimerForTokenRenewal(dtr);
if (LOG.isDebugEnabled()) {
@@ -338,11 +384,9 @@ public class DelegationTokenRenewer exte
+ dtr.token.getService() + " for appId = " + dtr.applicationId);
}
}
- } else {
- pendingTokenForRenewal.addAll(dtrs);
}
}
-
+
/**
* Task - to renew a token
*
@@ -449,14 +493,20 @@ public class DelegationTokenRenewer exte
* @param applicationId completed application
*/
public void applicationFinished(ApplicationId applicationId) {
+ processDelegationTokenRewewerEvent(new DelegationTokenRenewerEvent(
+ applicationId,
+ DelegationTokenRenewerEventType.FINISH_APPLICATION));
+ }
+
+ private void handleAppFinishEvent(DelegationTokenRenewerEvent evt) {
if (!tokenKeepAliveEnabled) {
- removeApplicationFromRenewal(applicationId);
+ removeApplicationFromRenewal(evt.getApplicationId());
} else {
- delayedRemovalMap.put(applicationId, System.currentTimeMillis()
+ delayedRemovalMap.put(evt.getApplicationId(), System.currentTimeMillis()
+ tokenRemovalDelayMs);
}
}
-
+
/**
* Add a list of applications to the keep alive list. If an appId already
* exists, update it's keep-alive time.
@@ -546,4 +596,111 @@ public class DelegationTokenRenewer exte
public void setRMContext(RMContext rmContext) {
this.rmContext = rmContext;
}
+
+ /*
+ * This will run as a separate thread and will process individual events. It
+ * is done in this way to make sure that the token renewal as a part of
+ * application submission and token removal as a part of application finish
+ * is asynchronous in nature.
+ */
+ private final class DelegationTokenRenewerRunnable
+ implements Runnable {
+
+ private DelegationTokenRenewerEvent evt;
+
+ public DelegationTokenRenewerRunnable(DelegationTokenRenewerEvent evt) {
+ this.evt = evt;
+ }
+
+ @Override
+ public void run() {
+ if (evt instanceof DelegationTokenRenewerAppSubmitEvent) {
+ DelegationTokenRenewerAppSubmitEvent appSubmitEvt =
+ (DelegationTokenRenewerAppSubmitEvent) evt;
+ handleDTRenewerAppSubmitEvent(appSubmitEvt);
+ } else if (evt.getType().equals(
+ DelegationTokenRenewerEventType.FINISH_APPLICATION)) {
+ DelegationTokenRenewer.this.handleAppFinishEvent(evt);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void handleDTRenewerAppSubmitEvent(
+ DelegationTokenRenewerAppSubmitEvent event) {
+ /*
+ * For applications submitted with delegation tokens we are not submitting
+ * the application to scheduler from RMAppManager. Instead we are doing
+ * it from here. The primary goal is to make token renewal as a part of
+ * application submission asynchronous so that client thread is not
+ * blocked during app submission.
+ */
+ try {
+ // Setup tokens for renewal
+ DelegationTokenRenewer.this.handleAppSubmitEvent(event);
+ rmContext.getDispatcher().getEventHandler()
+ .handle(new RMAppEvent(event.getApplicationId(),
+ event.isApplicationRecovered() ? RMAppEventType.RECOVER
+ : RMAppEventType.START));
+ } catch (Throwable t) {
+ LOG.warn(
+ "Unable to add the application to the delegation token renewer.",
+ t);
+ // Sending APP_REJECTED is fine, since we assume that the
+ // RMApp is in NEW state and thus we havne't yet informed the
+ // Scheduler about the existence of the application
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppRejectedEvent(event.getApplicationId(), t.getMessage()));
+ }
+ }
+ }
+
+ class DelegationTokenRenewerAppSubmitEvent extends
+ DelegationTokenRenewerEvent {
+
+ private Credentials credentials;
+ private boolean shouldCancelAtEnd;
+ private boolean isAppRecovered;
+
+ public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
+ Credentials credentails, boolean shouldCancelAtEnd,
+ boolean isApplicationRecovered) {
+ super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
+ this.credentials = credentails;
+ this.shouldCancelAtEnd = shouldCancelAtEnd;
+ this.isAppRecovered = isApplicationRecovered;
+ }
+
+ public Credentials getCredentials() {
+ return credentials;
+ }
+
+ public boolean shouldCancelAtEnd() {
+ return shouldCancelAtEnd;
+ }
+
+ public boolean isApplicationRecovered() {
+ return isAppRecovered;
+ }
+ }
+
+ enum DelegationTokenRenewerEventType {
+ VERIFY_AND_START_APPLICATION,
+ FINISH_APPLICATION
+ }
+
+ class DelegationTokenRenewerEvent extends
+ AbstractEvent<DelegationTokenRenewerEventType> {
+
+ private ApplicationId appId;
+
+ public DelegationTokenRenewerEvent(ApplicationId appId,
+ DelegationTokenRenewerEventType type) {
+ super(type);
+ this.appId = appId;
+ }
+
+ public ApplicationId getApplicationId() {
+ return appId;
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java?rev=1543510&r1=1543509&r2=1543510&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java Tue Nov 19 17:26:23 2013
@@ -249,7 +249,8 @@ class CapacitySchedulerPage extends RmVi
_("$(function() {",
" $('#cs a span').addClass('ui-corner-all').css('position', 'absolute');",
" $('#cs').bind('loaded.jstree', function (e, data) {",
- " data.inst.open_node('#pq', true);",
+ " var callback = { call:reopenQueryNodes }",
+ " data.inst.open_node('#pq', callback);",
" }).",
" jstree({",
" core: { animation: 188, html_titles: true },",
@@ -265,7 +266,8 @@ class CapacitySchedulerPage extends RmVi
" $('#apps').dataTable().fnFilter(q, 3, true);",
" });",
" $('#cs').show();",
- "});")._();
+ "});")._().
+ _(SchedulerPageUtil.QueueBlockUtil.class);
}
@Override protected Class<? extends SubView> content() {