You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/12/14 20:58:01 UTC
svn commit: r1214432 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/
hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/
hadoop-...
Author: vinodkv
Date: Wed Dec 14 19:58:00 2011
New Revision: 1214432
URL: http://svn.apache.org/viewvc?rev=1214432&view=rev
Log:
MAPREDUCE-3398. Fixed log aggregation to work correctly in secure mode. Contributed by Siddharth Seth.
svn merge -c 1214429 --ignore-ancestry ../../trunk/
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Wed Dec 14 19:58:00 2011
@@ -242,6 +242,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3557. MR1 test fail to compile because of missing hadoop-archives
dependency. (tucu)
+ MAPREDUCE-3398. Fixed log aggregation to work correctly in secure mode.
+ (Siddharth Seth via vinodkv)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Wed Dec 14 19:58:00 2011
@@ -91,12 +91,7 @@ public class YarnConfiguration extends C
public static final String RM_CLIENT_THREAD_COUNT =
RM_PREFIX + "client.thread-count";
public static final int DEFAULT_RM_CLIENT_THREAD_COUNT = 10;
-
- /** The expiry interval for application master reporting.*/
- public static final String RM_AM_EXPIRY_INTERVAL_MS =
- RM_PREFIX + "am.liveness-monitor.expiry-interval-ms";
- public static final int DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000;
-
+
/** The Kerberos principal for the resource manager.*/
public static final String RM_PRINCIPAL =
RM_PREFIX + "principal";
@@ -126,7 +121,17 @@ public class YarnConfiguration extends C
public static final int DEFAULT_RM_RESOURCE_TRACKER_PORT = 8025;
public static final String DEFAULT_RM_RESOURCE_TRACKER_ADDRESS =
"0.0.0.0:" + DEFAULT_RM_RESOURCE_TRACKER_PORT;
-
+
+ /** The expiry interval for application master reporting.*/
+ public static final String RM_AM_EXPIRY_INTERVAL_MS =
+ YARN_PREFIX + "am.liveness-monitor.expiry-interval-ms";
+ public static final int DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000;
+
+ /** How long to wait until a node manager is considered dead.*/
+ public static final String RM_NM_EXPIRY_INTERVAL_MS =
+ YARN_PREFIX + "nm.liveness-monitor.expiry-interval-ms";
+ public static final int DEFAULT_RM_NM_EXPIRY_INTERVAL_MS = 600000;
+
/** Are acls enabled.*/
public static final String YARN_ACL_ENABLE =
YARN_PREFIX + "acl.enable";
@@ -160,12 +165,7 @@ public class YarnConfiguration extends C
/** The keytab for the resource manager.*/
public static final String RM_KEYTAB =
RM_PREFIX + "keytab";
-
- /** How long to wait until a node manager is considered dead.*/
- public static final String RM_NM_EXPIRY_INTERVAL_MS =
- RM_PREFIX + "nm.liveness-monitor.expiry-interval-ms";
- public static final int DEFAULT_RM_NM_EXPIRY_INTERVAL_MS = 600000;
-
+
/** How long to wait until a container is considered dead.*/
public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS =
RM_PREFIX + "rm.container-allocation.expiry-interval-ms";
@@ -293,10 +293,16 @@ public class YarnConfiguration extends C
public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
+ /** Interval at which the delayed token removal thread runs */
+ public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
+ RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
+ public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
+ 30000l;
+
/** Whether to enable log aggregation */
- public static final String NM_LOG_AGGREGATION_ENABLED = NM_PREFIX
+ public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
+ "log-aggregation-enable";
- public static final boolean DEFAULT_NM_LOG_AGGREGATION_ENABLED = false;
+ public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false;
/**
* Number of seconds to retain logs on the NodeManager. Only applicable if Log
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java Wed Dec 14 19:58:00 2011
@@ -53,8 +53,8 @@ public class AggregatedLogsBlock extends
logEntity = containerId.toString();
}
- if (!conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
- YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
+ if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
html.h1()
._("Aggregation is not enabled. Try the nodemanager at " + nodeId)
._();
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java Wed Dec 14 19:58:00 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.ap
import java.util.List;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -33,6 +34,9 @@ public interface NodeStatus {
public abstract void setContainersStatuses(
List<ContainerStatus> containersStatuses);
+ public abstract List<ApplicationId> getKeepAliveApplications();
+ public abstract void setKeepAliveApplications(List<ApplicationId> appIds);
+
NodeHealthStatus getNodeHealthStatus();
void setNodeHealthStatus(NodeHealthStatus healthStatus);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java Wed Dec 14 19:58:00 2011
@@ -23,13 +23,16 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeHealthStatusPBImpl;
import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeHealthStatusProto;
import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@@ -37,7 +40,9 @@ import org.apache.hadoop.yarn.proto.Yarn
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
-public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements NodeStatus {
+
+public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements
+ NodeStatus {
NodeStatusProto proto = NodeStatusProto.getDefaultInstance();
NodeStatusProto.Builder builder = null;
boolean viaProto = false;
@@ -45,6 +50,7 @@ public class NodeStatusPBImpl extends Pr
private NodeId nodeId = null;
private List<ContainerStatus> containers = null;
private NodeHealthStatus nodeHealthStatus = null;
+ private List<ApplicationId> keepAliveApplications = null;
public NodeStatusPBImpl() {
builder = NodeStatusProto.newBuilder();
@@ -55,15 +61,14 @@ public class NodeStatusPBImpl extends Pr
viaProto = true;
}
- public NodeStatusProto getProto() {
-
- mergeLocalToProto();
+ public synchronized NodeStatusProto getProto() {
+ mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
- private void mergeLocalToBuilder() {
+ private synchronized void mergeLocalToBuilder() {
if (this.nodeId != null) {
builder.setNodeId(convertToProtoFormat(this.nodeId));
}
@@ -73,9 +78,12 @@ public class NodeStatusPBImpl extends Pr
if (this.nodeHealthStatus != null) {
builder.setNodeHealthStatus(convertToProtoFormat(this.nodeHealthStatus));
}
+ if (this.keepAliveApplications != null) {
+ addKeepAliveApplicationsToProto();
+ }
}
- private void mergeLocalToProto() {
+ private synchronized void mergeLocalToProto() {
if (viaProto)
maybeInitBuilder();
mergeLocalToBuilder();
@@ -84,14 +92,14 @@ public class NodeStatusPBImpl extends Pr
viaProto = true;
}
- private void maybeInitBuilder() {
+ private synchronized void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = NodeStatusProto.newBuilder(proto);
}
viaProto = false;
}
- private void addContainersToProto() {
+ private synchronized void addContainersToProto() {
maybeInitBuilder();
builder.clearContainersStatuses();
if (containers == null)
@@ -124,19 +132,53 @@ public class NodeStatusPBImpl extends Pr
};
builder.addAllContainersStatuses(iterable);
}
+
+ private synchronized void addKeepAliveApplicationsToProto() {
+ maybeInitBuilder();
+ builder.clearKeepAliveApplications();
+ if (keepAliveApplications == null)
+ return;
+ Iterable<ApplicationIdProto> iterable = new Iterable<ApplicationIdProto>() {
+ @Override
+ public Iterator<ApplicationIdProto> iterator() {
+ return new Iterator<ApplicationIdProto>() {
+
+ Iterator<ApplicationId> iter = keepAliveApplications.iterator();
+
+ @Override
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+
+ @Override
+ public ApplicationIdProto next() {
+ return convertToProtoFormat(iter.next());
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+
+ }
+ };
+
+ }
+ };
+ builder.addAllKeepAliveApplications(iterable);
+ }
@Override
- public int getResponseId() {
+ public synchronized int getResponseId() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
return p.getResponseId();
}
@Override
- public void setResponseId(int responseId) {
+ public synchronized void setResponseId(int responseId) {
maybeInitBuilder();
builder.setResponseId(responseId);
}
@Override
- public NodeId getNodeId() {
+ public synchronized NodeId getNodeId() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
if (this.nodeId != null) {
return this.nodeId;
@@ -148,8 +190,9 @@ public class NodeStatusPBImpl extends Pr
return this.nodeId;
}
+
@Override
- public void setNodeId(NodeId nodeId) {
+ public synchronized void setNodeId(NodeId nodeId) {
maybeInitBuilder();
if (nodeId == null)
builder.clearNodeId();
@@ -158,20 +201,35 @@ public class NodeStatusPBImpl extends Pr
}
@Override
- public List<ContainerStatus> getContainersStatuses() {
+ public synchronized List<ContainerStatus> getContainersStatuses() {
initContainers();
return this.containers;
}
@Override
- public void setContainersStatuses(List<ContainerStatus> containers) {
+ public synchronized void setContainersStatuses(
+ List<ContainerStatus> containers) {
if (containers == null) {
builder.clearContainersStatuses();
}
this.containers = containers;
}
+
+ @Override
+ public synchronized List<ApplicationId> getKeepAliveApplications() {
+ initKeepAliveApplications();
+ return this.keepAliveApplications;
+ }
+
+ @Override
+ public synchronized void setKeepAliveApplications(List<ApplicationId> appIds) {
+ if (appIds == null) {
+ builder.clearKeepAliveApplications();
+ }
+ this.keepAliveApplications = appIds;
+ }
- private void initContainers() {
+ private synchronized void initContainers() {
if (this.containers != null) {
return;
}
@@ -185,8 +243,22 @@ public class NodeStatusPBImpl extends Pr
}
+ private synchronized void initKeepAliveApplications() {
+ if (this.keepAliveApplications != null) {
+ return;
+ }
+ NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
+ List<ApplicationIdProto> list = p.getKeepAliveApplicationsList();
+ this.keepAliveApplications = new ArrayList<ApplicationId>();
+
+ for (ApplicationIdProto c : list) {
+ this.keepAliveApplications.add(convertFromProtoFormat(c));
+ }
+
+ }
+
@Override
- public NodeHealthStatus getNodeHealthStatus() {
+ public synchronized NodeHealthStatus getNodeHealthStatus() {
NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
if (nodeHealthStatus != null) {
return nodeHealthStatus;
@@ -199,7 +271,7 @@ public class NodeStatusPBImpl extends Pr
}
@Override
- public void setNodeHealthStatus(NodeHealthStatus healthStatus) {
+ public synchronized void setNodeHealthStatus(NodeHealthStatus healthStatus) {
maybeInitBuilder();
if (healthStatus == null) {
builder.clearNodeHealthStatus();
@@ -231,4 +303,12 @@ public class NodeStatusPBImpl extends Pr
private ContainerStatusProto convertToProtoFormat(ContainerStatus c) {
return ((ContainerStatusPBImpl)c).getProto();
}
-}
+
+ private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) {
+ return new ApplicationIdPBImpl(c);
+ }
+
+ private ApplicationIdProto convertToProtoFormat(ApplicationId c) {
+ return ((ApplicationIdPBImpl)c).getProto();
+ }
+}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto Wed Dec 14 19:58:00 2011
@@ -34,6 +34,7 @@ message NodeStatusProto {
optional int32 response_id = 2;
repeated ContainerStatusProto containersStatuses = 3;
optional NodeHealthStatusProto nodeHealthStatus = 4;
+ repeated ApplicationIdProto keep_alive_applications = 5;
}
message RegistrationResponseProto {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml Wed Dec 14 19:58:00 2011
@@ -72,7 +72,7 @@
<property>
<description>The expiry interval for application master reporting.</description>
- <name>yarn.resourcemanager.am.liveness-monitor.expiry-interval-ms</name>
+ <name>yarn.am.liveness-monitor.expiry-interval-ms</name>
<value>600000</value>
</property>
@@ -155,7 +155,7 @@
<property>
<description>How long to wait until a node manager is considered dead.</description>
- <name>yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms</name>
+ <name>yarn.nm.liveness-monitor.expiry-interval-ms</name>
<value>600000</value>
</property>
@@ -210,6 +210,12 @@
<value>10000</value>
</property>
+ <property>
+ <description>Interval at which the delayed token removal thread runs</description>
+ <name>yarn.resourcemanager.delayed.delegation-token.removal-interval-ms</name>
+ <value>30000</value>
+ </property>
+
<!-- Node Manager Configs -->
<property>
<description>address of node manager IPC.</description>
@@ -304,7 +310,7 @@
<property>
<description>Whether to enable log aggregation</description>
- <name>yarn.nodemanager.log-aggregation-enable</name>
+ <name>yarn.log-aggregation-enable</name>
<value>false</value>
</property>
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Dec 14 19:58:00 2011
@@ -20,8 +20,12 @@ package org.apache.hadoop.yarn.server.no
import java.net.InetSocketAddress;
import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.Random;
import java.util.Map.Entry;
import org.apache.avro.AvroRuntimeException;
@@ -56,6 +60,7 @@ import org.apache.hadoop.yarn.server.nod
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.AbstractService;
+
public class NodeStatusUpdaterImpl extends AbstractService implements
NodeStatusUpdater {
@@ -76,6 +81,12 @@ public class NodeStatusUpdaterImpl exten
private byte[] secretKeyBytes = new byte[0];
private boolean isStopped;
private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+ private boolean tokenKeepAliveEnabled;
+ private long tokenRemovalDelayMs;
+ /** Keeps track of when the next keep alive request should be sent for an app*/
+ private Map<ApplicationId, Long> appTokenKeepAliveMap =
+ new HashMap<ApplicationId, Long>();
+ private Random keepAliveDelayRandom = new Random();
private final NodeHealthCheckerService healthChecker;
private final NodeManagerMetrics metrics;
@@ -103,6 +114,13 @@ public class NodeStatusUpdaterImpl exten
this.totalResource = recordFactory.newRecordInstance(Resource.class);
this.totalResource.setMemory(memoryMb);
metrics.addResource(totalResource);
+ this.tokenKeepAliveEnabled =
+ conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
+ && isSecurityEnabled();
+ this.tokenRemovalDelayMs =
+ conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
super.init(conf);
}
@@ -139,6 +157,10 @@ public class NodeStatusUpdaterImpl exten
super.stop();
}
+ protected boolean isSecurityEnabled() {
+ return UserGroupInformation.isSecurityEnabled();
+ }
+
protected ResourceTracker getRMClient() {
Configuration conf = getConfig();
YarnRPC rpc = YarnRPC.create(conf);
@@ -188,6 +210,29 @@ public class NodeStatusUpdaterImpl exten
return this.secretKeyBytes.clone();
}
+ private List<ApplicationId> createKeepAliveApplicationList() {
+ if (!tokenKeepAliveEnabled) {
+ return Collections.emptyList();
+ }
+
+ List<ApplicationId> appList = new ArrayList<ApplicationId>();
+ for (Iterator<Entry<ApplicationId, Long>> i =
+ this.appTokenKeepAliveMap.entrySet().iterator(); i.hasNext();) {
+ Entry<ApplicationId, Long> e = i.next();
+ ApplicationId appId = e.getKey();
+ Long nextKeepAlive = e.getValue();
+ if (!this.context.getApplications().containsKey(appId)) {
+ // Remove if the application has finished.
+ i.remove();
+ } else if (System.currentTimeMillis() > nextKeepAlive) {
+ // KeepAlive list for the next hearbeat.
+ appList.add(appId);
+ trackAppForKeepAlive(appId);
+ }
+ }
+ return appList;
+ }
+
private NodeStatus getNodeStatus() {
NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
@@ -231,9 +276,29 @@ public class NodeStatusUpdaterImpl exten
}
nodeStatus.setNodeHealthStatus(nodeHealthStatus);
+ List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
+ nodeStatus.setKeepAliveApplications(keepAliveAppIds);
+
return nodeStatus;
}
+ private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
+ if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) {
+ for (ApplicationId appId : appIds) {
+ trackAppForKeepAlive(appId);
+ }
+ }
+ }
+
+ private void trackAppForKeepAlive(ApplicationId appId) {
+ // Next keepAlive request for app between 0.7 & 0.9 of when the token will
+ // likely expire.
+ long nextTime = System.currentTimeMillis()
+ + (long) (0.7 * tokenRemovalDelayMs + (0.2 * tokenRemovalDelayMs
+ * keepAliveDelayRandom.nextInt(100))/100);
+ appTokenKeepAliveMap.put(appId, nextTime);
+ }
+
@Override
public void sendOutofBandHeartBeat() {
synchronized (this.heartbeatMonitor) {
@@ -245,6 +310,7 @@ public class NodeStatusUpdaterImpl exten
new Thread("Node Status Updater") {
@Override
+ @SuppressWarnings("unchecked")
public void run() {
int lastHeartBeatID = 0;
while (!isStopped) {
@@ -284,6 +350,8 @@ public class NodeStatusUpdaterImpl exten
}
List<ApplicationId> appsToCleanup =
response.getApplicationsToCleanupList();
+ //Only start tracking for keepAlive on FINISH_APP
+ trackAppsForKeepAlive(appsToCleanup);
if (appsToCleanup.size() != 0) {
dispatcher.getEventHandler().handle(
new CMgrCompletedAppsEvent(appsToCleanup));
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Wed Dec 14 19:58:00 2011
@@ -192,8 +192,8 @@ public class ContainerManagerImpl extend
protected LogHandler createLogHandler(Configuration conf, Context context,
DeletionService deletionService) {
- if (conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
- YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
+ if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
return new LogAggregationService(this.dispatcher, context,
deletionService, dirsHandler);
} else {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Wed Dec 14 19:58:00 2011
@@ -170,6 +170,7 @@ public class AppLogAggregatorImpl implem
this.writer.closeWriter();
LOG.info("Finished aggregate log-file for app " + this.applicationId);
}
+
try {
userUgi.doAs(new PrivilegedExceptionAction<Object>() {
@Override
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java Wed Dec 14 19:58:00 2011
@@ -88,8 +88,8 @@ public class NMController extends Contro
containerId.getApplicationAttemptId().getApplicationId();
Application app = nmContext.getApplications().get(appId);
if (app == null
- && nmConf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
- YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
+ && nmConf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
String logServerUrl = nmConf.get(YarnConfiguration.YARN_LOG_SERVER_URL);
String redirectUrl = null;
if (logServerUrl == null || logServerUrl.isEmpty()) {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Wed Dec 14 19:58:00 2011
@@ -22,7 +22,9 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
@@ -56,6 +58,7 @@ import org.apache.hadoop.yarn.server.api
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
@@ -63,10 +66,12 @@ import org.apache.hadoop.yarn.server.sec
import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
import org.apache.hadoop.yarn.service.Service;
import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import static org.mockito.Mockito.mock;
public class TestNodeStatusUpdater {
@@ -216,7 +221,7 @@ public class TestNodeStatusUpdater {
HeartbeatResponse response = recordFactory
.newRecordInstance(HeartbeatResponse.class);
response.setResponseId(heartBeatID);
-
+
NodeHeartbeatResponse nhResponse = recordFactory
.newRecordInstance(NodeHeartbeatResponse.class);
nhResponse.setHeartbeatResponse(response);
@@ -241,6 +246,48 @@ public class TestNodeStatusUpdater {
return resourceTracker;
}
}
+
+ private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
+ public ResourceTracker resourceTracker;
+ private Context context;
+
+ public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
+ ContainerTokenSecretManager containerTokenSecretManager) {
+ super(context, dispatcher, healthChecker, metrics,
+ containerTokenSecretManager);
+ this.context = context;
+ this.resourceTracker = new MyResourceTracker3(this.context);
+ }
+
+ @Override
+ protected ResourceTracker getRMClient() {
+ return resourceTracker;
+ }
+
+ @Override
+ protected boolean isSecurityEnabled() {
+ return true;
+ }
+ }
+
+ private class MyNodeManager extends NodeManager {
+
+ private MyNodeStatusUpdater3 nodeStatusUpdater;
+ @Override
+ protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+ ContainerTokenSecretManager containerTokenSecretManager) {
+ this.nodeStatusUpdater =
+ new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics,
+ containerTokenSecretManager);
+ return this.nodeStatusUpdater;
+ }
+
+ protected MyNodeStatusUpdater3 getNodeStatusUpdater() {
+ return this.nodeStatusUpdater;
+ }
+ }
//
private class MyResourceTracker2 implements ResourceTracker {
@@ -276,6 +323,65 @@ public class TestNodeStatusUpdater {
}
}
+ private class MyResourceTracker3 implements ResourceTracker {
+ public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
+ public NodeAction registerNodeAction = NodeAction.NORMAL;
+ private Map<ApplicationId, List<Long>> keepAliveRequests =
+ new HashMap<ApplicationId, List<Long>>();
+ private ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+ private final Context context;
+
+ MyResourceTracker3(Context context) {
+ this.context = context;
+ }
+
+ @Override
+ public RegisterNodeManagerResponse registerNodeManager(
+ RegisterNodeManagerRequest request) throws YarnRemoteException {
+
+ RegisterNodeManagerResponse response =
+ recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
+ RegistrationResponse regResponse =
+ recordFactory.newRecordInstance(RegistrationResponse.class);
+ regResponse.setNodeAction(registerNodeAction);
+ response.setRegistrationResponse(regResponse);
+ return response;
+ }
+
+ @Override
+ public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+ throws YarnRemoteException {
+ LOG.info("Got heartBeatId: [" + heartBeatID +"]");
+ NodeStatus nodeStatus = request.getNodeStatus();
+ nodeStatus.setResponseId(heartBeatID++);
+ HeartbeatResponse response =
+ recordFactory.newRecordInstance(HeartbeatResponse.class);
+ response.setResponseId(heartBeatID);
+ response.setNodeAction(heartBeatNodeAction);
+
+ if (nodeStatus.getKeepAliveApplications() != null
+ && nodeStatus.getKeepAliveApplications().size() > 0) {
+ for (ApplicationId appId : nodeStatus.getKeepAliveApplications()) {
+ List<Long> list = keepAliveRequests.get(appId);
+ if (list == null) {
+ list = new LinkedList<Long>();
+ keepAliveRequests.put(appId, list);
+ }
+ list.add(System.currentTimeMillis());
+ }
+ }
+ if (heartBeatID == 2) {
+ LOG.info("Sending FINISH_APP for application: [" + appId + "]");
+ this.context.getApplications().put(appId, mock(Application.class));
+ response.addAllApplicationsToCleanup(Collections.singletonList(appId));
+ }
+ NodeHeartbeatResponse nhResponse =
+ recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
+ nhResponse.setHeartbeatResponse(response);
+ return nhResponse;
+ }
+ }
+
@Before
public void clearError() {
nmStartError = null;
@@ -456,6 +562,38 @@ public class TestNodeStatusUpdater {
verifyNodeStartFailure("Starting of RPC Server failed");
}
+ @Test
+ public void testApplicationKeepAlive() throws Exception {
+ MyNodeManager nm = new MyNodeManager();
+ try {
+ YarnConfiguration conf = createNMConfig();
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+ conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
+ 4000l);
+ nm.init(conf);
+ nm.start();
+ // HB 2 -> app cancelled by RM.
+ while (heartBeatID < 12) {
+ Thread.sleep(1000l);
+ }
+ MyResourceTracker3 rt =
+ (MyResourceTracker3) nm.getNodeStatusUpdater().getRMClient();
+ rt.context.getApplications().remove(rt.appId);
+ Assert.assertEquals(1, rt.keepAliveRequests.size());
+ int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size();
+ LOG.info("Number of Keep Alive Requests: [" + numKeepAliveRequests + "]");
+ Assert.assertTrue(numKeepAliveRequests == 2 || numKeepAliveRequests == 3);
+ while (heartBeatID < 20) {
+ Thread.sleep(1000l);
+ }
+ int numKeepAliveRequests2 = rt.keepAliveRequests.get(rt.appId).size();
+ Assert.assertEquals(numKeepAliveRequests, numKeepAliveRequests2);
+ } finally {
+ if (nm.getServiceState() == STATE.STARTED)
+ nm.stop();
+ }
+ }
+
private void verifyNodeStartFailure(String errMessage) {
YarnConfiguration conf = createNMConfig();
nm.init(conf);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java Wed Dec 14 19:58:00 2011
@@ -68,7 +68,7 @@ public class TestNonAggregatingLogHandle
+ localLogDirs[1].getAbsolutePath();
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
- conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l);
DrainDispatcher dispatcher = createDispatcher(conf);
@@ -142,7 +142,7 @@ public class TestNonAggregatingLogHandle
+ localLogDirs[1].getAbsolutePath();
conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
- conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
+ conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 10800l);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Wed Dec 14 19:58:00 2011
@@ -173,7 +173,7 @@ public class RMAppManager implements Eve
} else {
// Inform the DelegationTokenRenewer
if (UserGroupInformation.isSecurityEnabled()) {
- rmContext.getDelegationTokenRenewer().removeApplication(applicationId);
+ rmContext.getDelegationTokenRenewer().applicationFinished(applicationId);
}
completedApps.add(applicationId);
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Wed Dec 14 19:58:00 2011
@@ -272,7 +272,8 @@ public class ResourceTrackerService exte
// 4. Send status to RMNode, saving the latest response.
this.rmContext.getDispatcher().getEventHandler().handle(
new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
- remoteNodeStatus.getContainersStatuses(), latestResponse));
+ remoteNodeStatus.getContainersStatuses(),
+ remoteNodeStatus.getKeepAliveApplications(), latestResponse));
nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
return nodeHeartBeatResponse;
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Wed Dec 14 19:58:00 2011
@@ -414,7 +414,9 @@ public class RMNodeImpl implements RMNod
rmNode.context.getDispatcher().getEventHandler().handle(
new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers,
completedContainers));
-
+
+ rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
+ statusEvent.getKeepAliveAppIds());
return RMNodeState.RUNNING;
}
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java Wed Dec 14 19:58:00 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import java.util.List;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -28,15 +29,17 @@ import org.apache.hadoop.yarn.server.api
public class RMNodeStatusEvent extends RMNodeEvent {
private final NodeHealthStatus nodeHealthStatus;
- private List<ContainerStatus> containersCollection;
+ private final List<ContainerStatus> containersCollection;
private final HeartbeatResponse latestResponse;
+ private final List<ApplicationId> keepAliveAppIds;
public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
- List<ContainerStatus> collection,
+ List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
HeartbeatResponse latestResponse) {
super(nodeId, RMNodeEventType.STATUS_UPDATE);
this.nodeHealthStatus = nodeHealthStatus;
this.containersCollection = collection;
+ this.keepAliveAppIds = keepAliveAppIds;
this.latestResponse = latestResponse;
}
@@ -51,4 +54,8 @@ public class RMNodeStatusEvent extends R
public HeartbeatResponse getLatestResponse() {
return this.latestResponse;
}
-}
+
+ public List<ApplicationId> getKeepAliveAppIds() {
+ return this.keepAliveAppIds;
+ }
+}
\ No newline at end of file
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Wed Dec 14 19:58:00 2011
@@ -20,14 +20,19 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
@@ -40,6 +45,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.service.AbstractService;
/**
@@ -65,7 +71,16 @@ public class DelegationTokenRenewer exte
// appId=>List<tokens>
private Set<DelegationTokenToRenew> delegationTokens =
Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
+
+ private final ConcurrentMap<ApplicationId, Long> delayedRemovalMap =
+ new ConcurrentHashMap<ApplicationId, Long>();
+ private long tokenRemovalDelayMs;
+
+ private Thread delayedRemovalThread;
+
+ private boolean tokenKeepAliveEnabled;
+
public DelegationTokenRenewer() {
super(DelegationTokenRenewer.class.getName());
}
@@ -73,6 +88,12 @@ public class DelegationTokenRenewer exte
@Override
public synchronized void init(Configuration conf) {
super.init(conf);
+ this.tokenKeepAliveEnabled =
+ conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+ YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+ this.tokenRemovalDelayMs =
+ conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
}
@Override
@@ -81,6 +102,12 @@ public class DelegationTokenRenewer exte
dtCancelThread.start();
renewalTimer = new Timer(true);
+ if (tokenKeepAliveEnabled) {
+ delayedRemovalThread =
+ new Thread(new DelayedTokenRemovalRunnable(getConfig()),
+ "DelayedTokenCanceller");
+ delayedRemovalThread.start();
+ }
}
@Override
@@ -94,6 +121,14 @@ public class DelegationTokenRenewer exte
} catch (InterruptedException e) {
e.printStackTrace();
}
+ if (tokenKeepAliveEnabled && delayedRemovalThread != null) {
+ delayedRemovalThread.interrupt();
+ try {
+ delayedRemovalThread.join(1000);
+ } catch (InterruptedException e) {
+ LOG.info("Interrupted while joining on delayed removal thread.", e);
+ }
+ }
super.stop();
}
@@ -343,12 +378,38 @@ public class DelegationTokenRenewer exte
if(t.timerTask!=null)
t.timerTask.cancel();
}
-
+
/**
* Removing delegation token for completed applications.
* @param applicationId completed application
*/
- public void removeApplication(ApplicationId applicationId) {
+ public void applicationFinished(ApplicationId applicationId) {
+ if (!tokenKeepAliveEnabled) {
+ removeApplicationFromRenewal(applicationId);
+ } else {
+ delayedRemovalMap.put(applicationId, System.currentTimeMillis()
+ + tokenRemovalDelayMs);
+ }
+ }
+
+ /**
+ * Add a list of applications to the keep alive list. If an appId already
+ * exists, update it's keep-alive time.
+ *
+ * @param appIds
+ * the list of applicationIds to be kept alive.
+ *
+ */
+ public void updateKeepAliveApplications(List<ApplicationId> appIds) {
+ if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) {
+ for (ApplicationId appId : appIds) {
+ delayedRemovalMap.put(appId, System.currentTimeMillis()
+ + tokenRemovalDelayMs);
+ }
+ }
+ }
+
+ private void removeApplicationFromRenewal(ApplicationId applicationId) {
synchronized (delegationTokens) {
Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
while(it.hasNext()) {
@@ -371,4 +432,50 @@ public class DelegationTokenRenewer exte
}
}
}
+
+ /**
+ * Takes care of cancelling app delegation tokens after the configured
+ * cancellation delay, taking into consideration keep-alive requests.
+ *
+ */
+ private class DelayedTokenRemovalRunnable implements Runnable {
+
+ private long waitTimeMs;
+
+ DelayedTokenRemovalRunnable(Configuration conf) {
+ waitTimeMs =
+ conf.getLong(
+ YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS);
+ }
+
+ @Override
+ public void run() {
+ List<ApplicationId> toCancel = new ArrayList<ApplicationId>();
+ while (!Thread.currentThread().isInterrupted()) {
+ Iterator<Entry<ApplicationId, Long>> it =
+ delayedRemovalMap.entrySet().iterator();
+ toCancel.clear();
+ while (it.hasNext()) {
+ Entry<ApplicationId, Long> e = it.next();
+ if (e.getValue() < System.currentTimeMillis()) {
+ toCancel.add(e.getKey());
+ }
+ }
+ for (ApplicationId appId : toCancel) {
+ removeApplicationFromRenewal(appId);
+ delayedRemovalMap.remove(appId);
+ }
+ synchronized (this) {
+ try {
+ wait(waitTimeMs);
+ } catch (InterruptedException e) {
+ LOG.info("Delayed Deletion Thread Interrupted. Shutting it down");
+ return;
+ }
+ }
+ }
+ }
+ }
+
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java Wed Dec 14 19:58:00 2011
@@ -20,12 +20,12 @@ package org.apache.hadoop.yarn.server.re
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Collections;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -44,6 +44,7 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.delegation.DelegationKey;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.BuilderUtils;
import org.junit.After;
import org.junit.Before;
@@ -328,7 +329,7 @@ public class TestDelegationTokenRenewer
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
delegationTokenRenewer.addApplication(applicationId_1, ts, true);
- delegationTokenRenewer.removeApplication(applicationId_1);
+ delegationTokenRenewer.applicationFinished(applicationId_1);
numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try {
@@ -343,7 +344,7 @@ public class TestDelegationTokenRenewer
// also renewing of the cancelled token should fail
try {
token4.renew(conf);
- assertTrue("Renewal of canceled token didn't fail", false);
+ fail("Renewal of cancelled token should have failed");
} catch (InvalidToken ite) {
//expected
}
@@ -377,7 +378,7 @@ public class TestDelegationTokenRenewer
ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
delegationTokenRenewer.addApplication(applicationId_1, ts, false);
- delegationTokenRenewer.removeApplication(applicationId_1);
+ delegationTokenRenewer.applicationFinished(applicationId_1);
int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
try {
@@ -393,4 +394,123 @@ public class TestDelegationTokenRenewer
// been canceled
token1.renew(conf);
}
+
+ /**
+ * Basic idea of the test:
+ * 0. Setup token KEEP_ALIVE
+ * 1. create tokens.
+ * 2. register them for renewal - to be cancelled on app complete
+ * 3. Complete app.
+ * 4. Verify token is alive within the KEEP_ALIVE time
+ * 5. Verify token has been cancelled after the KEEP_ALIVE_TIME
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ @Test
+ public void testDTKeepAlive1 () throws Exception {
+ DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
+ Configuration lconf = new Configuration(conf);
+ lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+ //Keep tokens alive for 6 seconds.
+ lconf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 6000l);
+ //Try removing tokens every second.
+ lconf.setLong(
+ YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
+ 1000l);
+ localDtr.init(lconf);
+ localDtr.start();
+
+ MyFS dfs = (MyFS)FileSystem.get(lconf);
+ LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+lconf.hashCode());
+
+ Credentials ts = new Credentials();
+ // get the delegation tokens
+ MyToken token1 = dfs.getDelegationToken(new Text("user1"));
+
+ String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0";
+ ts.addToken(new Text(nn1), token1);
+
+ // register the tokens for renewal
+ ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
+ localDtr.addApplication(applicationId_0, ts, true);
+ localDtr.applicationFinished(applicationId_0);
+
+ Thread.sleep(3000l);
+
+ //Token should still be around. Renewal should not fail.
+ token1.renew(lconf);
+
+ //Allow the keepalive time to run out
+ Thread.sleep(6000l);
+
+ //The token should have been cancelled at this point. Renewal will fail.
+ try {
+ token1.renew(lconf);
+ fail("Renewal of cancelled token should have failed");
+ } catch (InvalidToken ite) {}
+ }
+
+ /**
+ * Basic idea of the test:
+ * 0. Setup token KEEP_ALIVE
+ * 1. create tokens.
+ * 2. register them for renewal - to be cancelled on app complete
+ * 3. Complete app.
+ * 4. Verify token is alive within the KEEP_ALIVE time
+ * 5. Send an explicity KEEP_ALIVE_REQUEST
+ * 6. Verify token KEEP_ALIVE time is renewed.
+ * 7. Verify token has been cancelled after the renewed KEEP_ALIVE_TIME.
+ * @throws IOException
+ * @throws URISyntaxException
+ */
+ @Test
+ public void testDTKeepAlive2() throws Exception {
+ DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
+ Configuration lconf = new Configuration(conf);
+ lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+ //Keep tokens alive for 6 seconds.
+ lconf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 6000l);
+ //Try removing tokens every second.
+ lconf.setLong(
+ YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
+ 1000l);
+ localDtr.init(lconf);
+ localDtr.start();
+
+ MyFS dfs = (MyFS)FileSystem.get(lconf);
+ LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+lconf.hashCode());
+
+ Credentials ts = new Credentials();
+ // get the delegation tokens
+ MyToken token1 = dfs.getDelegationToken(new Text("user1"));
+
+ String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0";
+ ts.addToken(new Text(nn1), token1);
+
+ // register the tokens for renewal
+ ApplicationId applicationId_0 = BuilderUtils.newApplicationId(0, 0);
+ localDtr.addApplication(applicationId_0, ts, true);
+ localDtr.applicationFinished(applicationId_0);
+
+ Thread.sleep(4000l);
+
+ //Send another keep alive.
+ localDtr.updateKeepAliveApplications(Collections
+ .singletonList(applicationId_0));
+ //Renewal should not fail.
+ token1.renew(lconf);
+
+ //Token should be around after this.
+ Thread.sleep(4500l);
+ //Renewal should not fail. - ~1.5 seconds for keepalive timeout.
+ token1.renew(lconf);
+
+ //Allow the keepalive time to run out
+ Thread.sleep(3000l);
+ //The token should have been cancelled at this point. Renewal will fail.
+ try {
+ token1.renew(lconf);
+ fail("Renewal of cancelled token should have failed");
+ } catch (InvalidToken ite) {}
+ }
}
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java Wed Dec 14 19:58:00 2011
@@ -395,7 +395,7 @@ public class TestRMWebServices extends J
nodeHealth.setHealthReport("test health report");
nodeHealth.setIsNodeHealthy(false);
node.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeHealth,
- new ArrayList<ContainerStatus>(), null));
+ new ArrayList<ContainerStatus>(), null, null));
rm.NMwaitForState(nm1.getNodeId(), RMNodeState.UNHEALTHY);
JSONObject json = r.path("ws").path("v1").path("cluster").path("nodes")
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm?rev=1214432&r1=1214431&r2=1214432&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm Wed Dec 14 19:58:00 2011
@@ -194,6 +194,10 @@ Hadoop MapReduce Next Generation - Clust
| | | Defaults to special value of <<*>> which means <anyone>. |
| | | Special value of just <space> means no one has access. |
*-------------------------+-------------------------+------------------------+
+| <<<yarn.log-aggregation-enable>>> | | |
+| | <false> | |
+| | | Configuration to enable or disable log aggregation |
+*-------------------------+-------------------------+------------------------+
* Configurations for ResourceManager:
@@ -260,10 +264,6 @@ Hadoop MapReduce Next Generation - Clust
| | are written. | |
| | | Multiple paths help spread disk i/o. |
*-------------------------+-------------------------+------------------------+
-| <<<yarn.nodemanager.log-aggregation-enable>>> | | |
-| | <false> | |
-| | | Configuration to enable or disable log aggregation |
-*-------------------------+-------------------------+------------------------+
| <<<yarn.nodemanager.log.retain-seconds>>> | | |
| | <10800> | |
| | | Default time (in seconds) to retain log files on the NodeManager |