You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/12/14 20:55:23 UTC

svn commit: r1214429 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/ hadoop-yarn/hadoop-yar...

Author: vinodkv
Date: Wed Dec 14 19:55:22 2011
New Revision: 1214429

URL: http://svn.apache.org/viewvc?rev=1214429&view=rev
Log:
MAPREDUCE-3398. Fixed log aggregation to work correctly in secure mode. Contributed by Siddharth Seth.

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Dec 14 19:55:22 2011
@@ -296,6 +296,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3541. Fix broken TestJobQueueClient test. (Ravi Prakash via 
     mahadev)
 
+    MAPREDUCE-3398. Fixed log aggregation to work correctly in secure mode.
+    (Siddharth Seth via vinodkv)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Wed Dec 14 19:55:22 2011
@@ -91,12 +91,7 @@ public class YarnConfiguration extends C
   public static final String RM_CLIENT_THREAD_COUNT =
     RM_PREFIX + "client.thread-count";
   public static final int DEFAULT_RM_CLIENT_THREAD_COUNT = 10;
-  
-  /** The expiry interval for application master reporting.*/
-  public static final String RM_AM_EXPIRY_INTERVAL_MS = 
-    RM_PREFIX  + "am.liveness-monitor.expiry-interval-ms";
-  public static final int DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000;
-  
+
   /** The Kerberos principal for the resource manager.*/
   public static final String RM_PRINCIPAL =
     RM_PREFIX + "principal";
@@ -126,7 +121,17 @@ public class YarnConfiguration extends C
   public static final int DEFAULT_RM_RESOURCE_TRACKER_PORT = 8025;
   public static final String DEFAULT_RM_RESOURCE_TRACKER_ADDRESS =
     "0.0.0.0:" + DEFAULT_RM_RESOURCE_TRACKER_PORT;
-  
+
+  /** The expiry interval for application master reporting.*/
+  public static final String RM_AM_EXPIRY_INTERVAL_MS = 
+    YARN_PREFIX  + "am.liveness-monitor.expiry-interval-ms";
+  public static final int DEFAULT_RM_AM_EXPIRY_INTERVAL_MS = 600000;
+
+  /** How long to wait until a node manager is considered dead.*/
+  public static final String RM_NM_EXPIRY_INTERVAL_MS = 
+    YARN_PREFIX + "nm.liveness-monitor.expiry-interval-ms";
+  public static final int DEFAULT_RM_NM_EXPIRY_INTERVAL_MS = 600000;
+
   /** Are acls enabled.*/
   public static final String YARN_ACL_ENABLE = 
     YARN_PREFIX + "acl.enable";
@@ -160,12 +165,7 @@ public class YarnConfiguration extends C
   /** The keytab for the resource manager.*/
   public static final String RM_KEYTAB = 
     RM_PREFIX + "keytab";
-  
-  /** How long to wait until a node manager is considered dead.*/
-  public static final String RM_NM_EXPIRY_INTERVAL_MS = 
-    RM_PREFIX + "nm.liveness-monitor.expiry-interval-ms";
-  public static final int DEFAULT_RM_NM_EXPIRY_INTERVAL_MS = 600000;
-  
+
   /** How long to wait until a container is considered dead.*/
   public static final String RM_CONTAINER_ALLOC_EXPIRY_INTERVAL_MS = 
     RM_PREFIX + "rm.container-allocation.expiry-interval-ms";
@@ -293,10 +293,16 @@ public class YarnConfiguration extends C
   public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
   public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
 
+  /** Interval at which the delayed token removal thread runs */
+  public static final String RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
+      RM_PREFIX + "delayed.delegation-token.removal-interval-ms";
+  public static final long DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS =
+      30000l;
+
   /** Whether to enable log aggregation */
-  public static final String NM_LOG_AGGREGATION_ENABLED = NM_PREFIX
+  public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
       + "log-aggregation-enable";
-  public static final boolean DEFAULT_NM_LOG_AGGREGATION_ENABLED = false;
+  public static final boolean DEFAULT_LOG_AGGREGATION_ENABLED = false;
   
   /**
    * Number of seconds to retain logs on the NodeManager. Only applicable if Log

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/webapp/log/AggregatedLogsBlock.java Wed Dec 14 19:55:22 2011
@@ -53,8 +53,8 @@ public class AggregatedLogsBlock extends
       logEntity = containerId.toString();
     }
 
-    if (!conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
-        YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
+    if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+        YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
       html.h1()
           ._("Aggregation is not enabled. Try the nodemanager at " + nodeId)
           ._();

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java Wed Dec 14 19:55:22 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.ap
 
 import java.util.List;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -33,6 +34,9 @@ public interface NodeStatus {
   public abstract void setContainersStatuses(
       List<ContainerStatus> containersStatuses);
 
+  public abstract List<ApplicationId> getKeepAliveApplications();
+  public abstract void setKeepAliveApplications(List<ApplicationId> appIds);
+  
   NodeHealthStatus getNodeHealthStatus();
   void setNodeHealthStatus(NodeHealthStatus healthStatus);
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java Wed Dec 14 19:55:22 2011
@@ -23,13 +23,16 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeHealthStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
+import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeHealthStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
@@ -37,7 +40,9 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProtoOrBuilder;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
     
-public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements NodeStatus {
+
+public class NodeStatusPBImpl extends ProtoBase<NodeStatusProto> implements
+    NodeStatus {
   NodeStatusProto proto = NodeStatusProto.getDefaultInstance();
   NodeStatusProto.Builder builder = null;
   boolean viaProto = false;
@@ -45,6 +50,7 @@ public class NodeStatusPBImpl extends Pr
   private NodeId nodeId = null;
   private List<ContainerStatus> containers = null;
   private NodeHealthStatus nodeHealthStatus = null;
+  private List<ApplicationId> keepAliveApplications = null;
   
   public NodeStatusPBImpl() {
     builder = NodeStatusProto.newBuilder();
@@ -55,15 +61,14 @@ public class NodeStatusPBImpl extends Pr
     viaProto = true;
   }
   
-  public NodeStatusProto getProto() {
-
-      mergeLocalToProto();
+  public synchronized NodeStatusProto getProto() {
+    mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
     viaProto = true;
     return proto;
   }
 
-  private void mergeLocalToBuilder() {
+  private synchronized void mergeLocalToBuilder() {
     if (this.nodeId != null) {
       builder.setNodeId(convertToProtoFormat(this.nodeId));
     }
@@ -73,9 +78,12 @@ public class NodeStatusPBImpl extends Pr
     if (this.nodeHealthStatus != null) {
       builder.setNodeHealthStatus(convertToProtoFormat(this.nodeHealthStatus));
     }
+    if (this.keepAliveApplications != null) {
+      addKeepAliveApplicationsToProto();
+    }
   }
 
-  private void mergeLocalToProto() {
+  private synchronized void mergeLocalToProto() {
     if (viaProto) 
       maybeInitBuilder();
     mergeLocalToBuilder();
@@ -84,14 +92,14 @@ public class NodeStatusPBImpl extends Pr
     viaProto = true;
   }
 
-  private void maybeInitBuilder() {
+  private synchronized void maybeInitBuilder() {
     if (viaProto || builder == null) {
       builder = NodeStatusProto.newBuilder(proto);
     }
     viaProto = false;
   }
     
-  private void addContainersToProto() {
+  private synchronized void addContainersToProto() {
     maybeInitBuilder();
     builder.clearContainersStatuses();
     if (containers == null)
@@ -124,19 +132,53 @@ public class NodeStatusPBImpl extends Pr
     };
     builder.addAllContainersStatuses(iterable);
   }
+  
+  private synchronized void addKeepAliveApplicationsToProto() {
+    maybeInitBuilder();
+    builder.clearKeepAliveApplications();
+    if (keepAliveApplications == null)
+      return;
+    Iterable<ApplicationIdProto> iterable = new Iterable<ApplicationIdProto>() {
+      @Override
+      public Iterator<ApplicationIdProto> iterator() {
+        return new Iterator<ApplicationIdProto>() {
+  
+          Iterator<ApplicationId> iter = keepAliveApplications.iterator();
+  
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+  
+          @Override
+          public ApplicationIdProto next() {
+            return convertToProtoFormat(iter.next());
+          }
+  
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+  
+          }
+        };
+  
+      }
+    };
+    builder.addAllKeepAliveApplications(iterable);
+  }
 
   @Override
-  public int getResponseId() {
+  public synchronized int getResponseId() {
     NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
     return p.getResponseId();
   }
   @Override
-  public void setResponseId(int responseId) {
+  public synchronized void setResponseId(int responseId) {
     maybeInitBuilder();
     builder.setResponseId(responseId);
   }
   @Override
-  public NodeId getNodeId() {
+  public synchronized NodeId getNodeId() {
     NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
     if (this.nodeId != null) {
       return this.nodeId;
@@ -148,8 +190,9 @@ public class NodeStatusPBImpl extends Pr
     
     return this.nodeId;
   }
+  
   @Override
-  public void setNodeId(NodeId nodeId) {
+  public synchronized void setNodeId(NodeId nodeId) {
     maybeInitBuilder();
     if (nodeId == null)
       builder.clearNodeId();
@@ -158,20 +201,35 @@ public class NodeStatusPBImpl extends Pr
   }
   
   @Override
-  public List<ContainerStatus> getContainersStatuses() {
+  public synchronized List<ContainerStatus> getContainersStatuses() {
     initContainers();
     return this.containers;
   }
 
   @Override
-  public void setContainersStatuses(List<ContainerStatus> containers) {
+  public synchronized void setContainersStatuses(
+      List<ContainerStatus> containers) {
     if (containers == null) {
       builder.clearContainersStatuses();
     }
     this.containers = containers;
   }
+  
+  @Override
+  public synchronized List<ApplicationId> getKeepAliveApplications() {
+    initKeepAliveApplications();
+    return this.keepAliveApplications;
+  }
+  
+  @Override
+  public synchronized void setKeepAliveApplications(List<ApplicationId> appIds) {
+    if (appIds == null) {
+      builder.clearKeepAliveApplications();
+    }
+    this.keepAliveApplications = appIds;
+  }
 
-  private void initContainers() {
+  private synchronized void initContainers() {
     if (this.containers != null) {
       return;
     }
@@ -185,8 +243,22 @@ public class NodeStatusPBImpl extends Pr
     
   }
   
+  private synchronized void initKeepAliveApplications() {
+    if (this.keepAliveApplications != null) {
+      return;
+    }
+    NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
+    List<ApplicationIdProto> list = p.getKeepAliveApplicationsList();
+    this.keepAliveApplications = new ArrayList<ApplicationId>();
+
+    for (ApplicationIdProto c : list) {
+      this.keepAliveApplications.add(convertFromProtoFormat(c));
+    }
+    
+  }
+  
   @Override
-  public NodeHealthStatus getNodeHealthStatus() {
+  public synchronized NodeHealthStatus getNodeHealthStatus() {
     NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
     if (nodeHealthStatus != null) {
       return nodeHealthStatus;
@@ -199,7 +271,7 @@ public class NodeStatusPBImpl extends Pr
   }
 
   @Override
-  public void setNodeHealthStatus(NodeHealthStatus healthStatus) {
+  public synchronized void setNodeHealthStatus(NodeHealthStatus healthStatus) {
     maybeInitBuilder();
     if (healthStatus == null) {
       builder.clearNodeHealthStatus();
@@ -231,4 +303,12 @@ public class NodeStatusPBImpl extends Pr
   private ContainerStatusProto convertToProtoFormat(ContainerStatus c) {
     return ((ContainerStatusPBImpl)c).getProto();
   }
-}  
+  
+  private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto c) {
+    return new ApplicationIdPBImpl(c);
+  }
+  
+  private ApplicationIdProto convertToProtoFormat(ApplicationId c) {
+    return ((ApplicationIdPBImpl)c).getProto();
+  }
+}

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto Wed Dec 14 19:55:22 2011
@@ -34,6 +34,7 @@ message NodeStatusProto {
   optional int32 response_id = 2;
   repeated ContainerStatusProto containersStatuses = 3;
   optional NodeHealthStatusProto nodeHealthStatus = 4;
+  repeated ApplicationIdProto keep_alive_applications = 5;
 }
 
 message RegistrationResponseProto {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/resources/yarn-default.xml Wed Dec 14 19:55:22 2011
@@ -72,7 +72,7 @@
 
   <property>
     <description>The expiry interval for application master reporting.</description>
-    <name>yarn.resourcemanager.am.liveness-monitor.expiry-interval-ms</name>
+    <name>yarn.am.liveness-monitor.expiry-interval-ms</name>
     <value>600000</value>
   </property>
 
@@ -155,7 +155,7 @@
 
   <property>
     <description>How long to wait until a node manager is considered dead.</description>
-    <name>yarn.resourcemanager.nm.liveness-monitor.expiry-interval-ms</name>
+    <name>yarn.nm.liveness-monitor.expiry-interval-ms</name>
     <value>600000</value>
   </property>
 
@@ -210,6 +210,12 @@
     <value>10000</value>
   </property>
 
+  <property>
+    <description>Interval at which the delayed token removal thread runs</description>
+    <name>yarn.resourcemanager.delayed.delegation-token.removal-interval-ms</name>
+    <value>30000</value>
+  </property>
+
   <!-- Node Manager Configs -->
   <property>
     <description>address of node manager IPC.</description>
@@ -304,7 +310,7 @@
 
   <property>
     <description>Whether to enable log aggregation</description>
-    <name>yarn.nodemanager.log-aggregation-enable</name>
+    <name>yarn.log-aggregation-enable</name>
     <value>false</value>
   </property>
   

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Wed Dec 14 19:55:22 2011
@@ -20,8 +20,12 @@ package org.apache.hadoop.yarn.server.no
 
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
+import java.util.Random;
 import java.util.Map.Entry;
 
 import org.apache.avro.AvroRuntimeException;
@@ -56,6 +60,7 @@ import org.apache.hadoop.yarn.server.nod
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.service.AbstractService;
 
+
 public class NodeStatusUpdaterImpl extends AbstractService implements
     NodeStatusUpdater {
 
@@ -76,6 +81,12 @@ public class NodeStatusUpdaterImpl exten
   private byte[] secretKeyBytes = new byte[0];
   private boolean isStopped;
   private RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+  private boolean tokenKeepAliveEnabled;
+  private long tokenRemovalDelayMs;
+  /** Keeps track of when the next keep alive request should be sent for an app*/
+  private Map<ApplicationId, Long> appTokenKeepAliveMap =
+      new HashMap<ApplicationId, Long>();
+  private Random keepAliveDelayRandom = new Random();
 
   private final NodeHealthCheckerService healthChecker;
   private final NodeManagerMetrics metrics;
@@ -103,6 +114,13 @@ public class NodeStatusUpdaterImpl exten
     this.totalResource = recordFactory.newRecordInstance(Resource.class);
     this.totalResource.setMemory(memoryMb);
     metrics.addResource(totalResource);
+    this.tokenKeepAliveEnabled =
+        conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+            YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)
+            && isSecurityEnabled();
+    this.tokenRemovalDelayMs =
+        conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
     super.init(conf);
   }
 
@@ -139,6 +157,10 @@ public class NodeStatusUpdaterImpl exten
     super.stop();
   }
 
+  protected boolean isSecurityEnabled() {
+    return UserGroupInformation.isSecurityEnabled();
+  }
+
   protected ResourceTracker getRMClient() {
     Configuration conf = getConfig();
     YarnRPC rpc = YarnRPC.create(conf);
@@ -188,6 +210,29 @@ public class NodeStatusUpdaterImpl exten
     return this.secretKeyBytes.clone();
   }
 
+  private List<ApplicationId> createKeepAliveApplicationList() {
+    if (!tokenKeepAliveEnabled) {
+      return Collections.emptyList();
+    }
+
+    List<ApplicationId> appList = new ArrayList<ApplicationId>();
+    for (Iterator<Entry<ApplicationId, Long>> i =
+        this.appTokenKeepAliveMap.entrySet().iterator(); i.hasNext();) {
+      Entry<ApplicationId, Long> e = i.next();
+      ApplicationId appId = e.getKey();
+      Long nextKeepAlive = e.getValue();
+      if (!this.context.getApplications().containsKey(appId)) {
+        // Remove if the application has finished.
+        i.remove();
+      } else if (System.currentTimeMillis() > nextKeepAlive) {
+        // KeepAlive list for the next hearbeat.
+        appList.add(appId);
+        trackAppForKeepAlive(appId);
+      }
+    }
+    return appList;
+  }
+
   private NodeStatus getNodeStatus() {
 
     NodeStatus nodeStatus = recordFactory.newRecordInstance(NodeStatus.class);
@@ -231,9 +276,29 @@ public class NodeStatusUpdaterImpl exten
     }
     nodeStatus.setNodeHealthStatus(nodeHealthStatus);
 
+    List<ApplicationId> keepAliveAppIds = createKeepAliveApplicationList();
+    nodeStatus.setKeepAliveApplications(keepAliveAppIds);
+    
     return nodeStatus;
   }
 
+  private void trackAppsForKeepAlive(List<ApplicationId> appIds) {
+    if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) {
+      for (ApplicationId appId : appIds) {
+        trackAppForKeepAlive(appId);
+      }
+    }
+  }
+
+  private void trackAppForKeepAlive(ApplicationId appId) {
+    // Next keepAlive request for app between 0.7 & 0.9 of when the token will
+    // likely expire.
+    long nextTime = System.currentTimeMillis()
+    + (long) (0.7 * tokenRemovalDelayMs + (0.2 * tokenRemovalDelayMs
+        * keepAliveDelayRandom.nextInt(100))/100);
+    appTokenKeepAliveMap.put(appId, nextTime);
+  }
+
   @Override
   public void sendOutofBandHeartBeat() {
     synchronized (this.heartbeatMonitor) {
@@ -245,6 +310,7 @@ public class NodeStatusUpdaterImpl exten
 
     new Thread("Node Status Updater") {
       @Override
+      @SuppressWarnings("unchecked")
       public void run() {
         int lastHeartBeatID = 0;
         while (!isStopped) {
@@ -284,6 +350,8 @@ public class NodeStatusUpdaterImpl exten
             }
             List<ApplicationId> appsToCleanup =
                 response.getApplicationsToCleanupList();
+            //Only start tracking for keepAlive on FINISH_APP
+            trackAppsForKeepAlive(appsToCleanup);
             if (appsToCleanup.size() != 0) {
               dispatcher.getEventHandler().handle(
                   new CMgrCompletedAppsEvent(appsToCleanup));

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java Wed Dec 14 19:55:22 2011
@@ -192,8 +192,8 @@ public class ContainerManagerImpl extend
 
   protected LogHandler createLogHandler(Configuration conf, Context context,
       DeletionService deletionService) {
-    if (conf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
-        YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
+    if (conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+        YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
       return new LogAggregationService(this.dispatcher, context,
           deletionService, dirsHandler);
     } else {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java Wed Dec 14 19:55:22 2011
@@ -170,6 +170,7 @@ public class AppLogAggregatorImpl implem
       this.writer.closeWriter();
       LOG.info("Finished aggregate log-file for app " + this.applicationId);
     }
+
     try {
       userUgi.doAs(new PrivilegedExceptionAction<Object>() {
         @Override

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMController.java Wed Dec 14 19:55:22 2011
@@ -88,8 +88,8 @@ public class NMController extends Contro
         containerId.getApplicationAttemptId().getApplicationId();
     Application app = nmContext.getApplications().get(appId);
     if (app == null
-        && nmConf.getBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED,
-            YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_ENABLED)) {
+        && nmConf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+            YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
       String logServerUrl = nmConf.get(YarnConfiguration.YARN_LOG_SERVER_URL);
       String redirectUrl = null;
       if (logServerUrl == null || logServerUrl.isEmpty()) {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Wed Dec 14 19:55:22 2011
@@ -22,7 +22,9 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
@@ -56,6 +58,7 @@ import org.apache.hadoop.yarn.server.api
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
 import org.apache.hadoop.yarn.server.api.records.RegistrationResponse;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
@@ -63,10 +66,12 @@ import org.apache.hadoop.yarn.server.sec
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.service.Service;
 import org.apache.hadoop.yarn.service.Service.STATE;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import static org.mockito.Mockito.mock;
 
 public class TestNodeStatusUpdater {
 
@@ -216,7 +221,7 @@ public class TestNodeStatusUpdater {
       HeartbeatResponse response = recordFactory
           .newRecordInstance(HeartbeatResponse.class);
       response.setResponseId(heartBeatID);
-      
+
       NodeHeartbeatResponse nhResponse = recordFactory
           .newRecordInstance(NodeHeartbeatResponse.class);
       nhResponse.setHeartbeatResponse(response);
@@ -241,6 +246,48 @@ public class TestNodeStatusUpdater {
       return resourceTracker;
     }
   }
+
+  private class MyNodeStatusUpdater3 extends NodeStatusUpdaterImpl {
+    public ResourceTracker resourceTracker;
+    private Context context;
+
+    public MyNodeStatusUpdater3(Context context, Dispatcher dispatcher,
+        NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics,
+        ContainerTokenSecretManager containerTokenSecretManager) {
+      super(context, dispatcher, healthChecker, metrics,
+          containerTokenSecretManager);
+      this.context = context;
+      this.resourceTracker = new MyResourceTracker3(this.context);
+    }
+
+    @Override
+    protected ResourceTracker getRMClient() {
+      return resourceTracker;
+    }
+    
+    @Override
+    protected boolean isSecurityEnabled() {
+      return true;
+    }
+  }
+
+  private class MyNodeManager extends NodeManager {
+    
+    private MyNodeStatusUpdater3 nodeStatusUpdater;
+    @Override
+    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+        Dispatcher dispatcher, NodeHealthCheckerService healthChecker,
+        ContainerTokenSecretManager containerTokenSecretManager) {
+      this.nodeStatusUpdater =
+          new MyNodeStatusUpdater3(context, dispatcher, healthChecker, metrics,
+              containerTokenSecretManager);
+      return this.nodeStatusUpdater;
+    }
+
+    protected MyNodeStatusUpdater3 getNodeStatusUpdater() {
+      return this.nodeStatusUpdater;
+    }
+  }
   
   // 
   private class MyResourceTracker2 implements ResourceTracker {
@@ -276,6 +323,65 @@ public class TestNodeStatusUpdater {
     }
   }
   
+  private class MyResourceTracker3 implements ResourceTracker {
+    public NodeAction heartBeatNodeAction = NodeAction.NORMAL;
+    public NodeAction registerNodeAction = NodeAction.NORMAL;
+    private Map<ApplicationId, List<Long>> keepAliveRequests =
+        new HashMap<ApplicationId, List<Long>>();
+    private ApplicationId appId = BuilderUtils.newApplicationId(1, 1);
+    private final Context context;
+
+    MyResourceTracker3(Context context) {
+      this.context = context;
+    }
+    
+    @Override
+    public RegisterNodeManagerResponse registerNodeManager(
+        RegisterNodeManagerRequest request) throws YarnRemoteException {
+
+      RegisterNodeManagerResponse response =
+          recordFactory.newRecordInstance(RegisterNodeManagerResponse.class);
+      RegistrationResponse regResponse =
+          recordFactory.newRecordInstance(RegistrationResponse.class);
+      regResponse.setNodeAction(registerNodeAction);
+      response.setRegistrationResponse(regResponse);
+      return response;
+    }
+
+    @Override
+    public NodeHeartbeatResponse nodeHeartbeat(NodeHeartbeatRequest request)
+        throws YarnRemoteException {
+      LOG.info("Got heartBeatId: [" + heartBeatID +"]");
+      NodeStatus nodeStatus = request.getNodeStatus();
+      nodeStatus.setResponseId(heartBeatID++);
+      HeartbeatResponse response =
+          recordFactory.newRecordInstance(HeartbeatResponse.class);
+      response.setResponseId(heartBeatID);
+      response.setNodeAction(heartBeatNodeAction);
+
+      if (nodeStatus.getKeepAliveApplications() != null
+          && nodeStatus.getKeepAliveApplications().size() > 0) {
+        for (ApplicationId appId : nodeStatus.getKeepAliveApplications()) {
+          List<Long> list = keepAliveRequests.get(appId);
+          if (list == null) {
+            list = new LinkedList<Long>();
+            keepAliveRequests.put(appId, list);
+          }
+          list.add(System.currentTimeMillis());
+        }
+      }
+      if (heartBeatID == 2) {
+        LOG.info("Sending FINISH_APP for application: [" + appId + "]");
+        this.context.getApplications().put(appId, mock(Application.class));
+        response.addAllApplicationsToCleanup(Collections.singletonList(appId));
+      }
+      NodeHeartbeatResponse nhResponse =
+          recordFactory.newRecordInstance(NodeHeartbeatResponse.class);
+      nhResponse.setHeartbeatResponse(response);
+      return nhResponse;
+    }
+  }
+
   @Before
   public void clearError() {
     nmStartError = null;
@@ -456,6 +562,38 @@ public class TestNodeStatusUpdater {
     verifyNodeStartFailure("Starting of RPC Server failed");
   }
 
+  @Test
+  public void testApplicationKeepAlive() throws Exception {
+    MyNodeManager nm = new MyNodeManager();
+    try {
+      YarnConfiguration conf = createNMConfig();
+      conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+      conf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
+          4000l);
+      nm.init(conf);
+      nm.start();
+      // HB 2 -> app cancelled by RM.
+      while (heartBeatID < 12) {
+        Thread.sleep(1000l);
+      }
+      MyResourceTracker3 rt =
+          (MyResourceTracker3) nm.getNodeStatusUpdater().getRMClient();
+      rt.context.getApplications().remove(rt.appId);
+      Assert.assertEquals(1, rt.keepAliveRequests.size());
+      int numKeepAliveRequests = rt.keepAliveRequests.get(rt.appId).size();
+      LOG.info("Number of Keep Alive Requests: [" + numKeepAliveRequests + "]");
+      Assert.assertTrue(numKeepAliveRequests == 2 || numKeepAliveRequests == 3);
+      while (heartBeatID < 20) {
+        Thread.sleep(1000l);
+      }
+      int numKeepAliveRequests2 = rt.keepAliveRequests.get(rt.appId).size();
+      Assert.assertEquals(numKeepAliveRequests, numKeepAliveRequests2);
+    } finally {
+      if (nm.getServiceState() == STATE.STARTED)
+        nm.stop();
+    }
+  }
+
   private void verifyNodeStartFailure(String errMessage) {
     YarnConfiguration conf = createNMConfig();
     nm.init(conf);

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/TestNonAggregatingLogHandler.java Wed Dec 14 19:55:22 2011
@@ -68,7 +68,7 @@ public class TestNonAggregatingLogHandle
             + localLogDirs[1].getAbsolutePath();
 
     conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
-    conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 0l);
 
     DrainDispatcher dispatcher = createDispatcher(conf);
@@ -142,7 +142,7 @@ public class TestNonAggregatingLogHandle
             + localLogDirs[1].getAbsolutePath();
 
     conf.set(YarnConfiguration.NM_LOG_DIRS, localLogDirsString);
-    conf.setBoolean(YarnConfiguration.NM_LOG_AGGREGATION_ENABLED, false);
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
 
     conf.setLong(YarnConfiguration.NM_LOG_RETAIN_SECONDS, 10800l);
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java Wed Dec 14 19:55:22 2011
@@ -173,7 +173,7 @@ public class RMAppManager implements Eve
     } else {
       // Inform the DelegationTokenRenewer
       if (UserGroupInformation.isSecurityEnabled()) {
-        rmContext.getDelegationTokenRenewer().removeApplication(applicationId);
+        rmContext.getDelegationTokenRenewer().applicationFinished(applicationId);
       }
       
       completedApps.add(applicationId);  

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Wed Dec 14 19:55:22 2011
@@ -272,7 +272,8 @@ public class ResourceTrackerService exte
     // 4. Send status to RMNode, saving the latest response.
     this.rmContext.getDispatcher().getEventHandler().handle(
         new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),
-            remoteNodeStatus.getContainersStatuses(), latestResponse));
+            remoteNodeStatus.getContainersStatuses(), 
+            remoteNodeStatus.getKeepAliveApplications(), latestResponse));
 
     nodeHeartBeatResponse.setHeartbeatResponse(latestResponse);
     return nodeHeartBeatResponse;

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java Wed Dec 14 19:55:22 2011
@@ -414,7 +414,9 @@ public class RMNodeImpl implements RMNod
       rmNode.context.getDispatcher().getEventHandler().handle(
           new NodeUpdateSchedulerEvent(rmNode, newlyLaunchedContainers, 
               completedContainers));
-
+      
+      rmNode.context.getDelegationTokenRenewer().updateKeepAliveApplications(
+          statusEvent.getKeepAliveAppIds());
       return RMNodeState.RUNNING;
     }
   }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStatusEvent.java Wed Dec 14 19:55:22 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import java.util.List;
 
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -28,15 +29,17 @@ import org.apache.hadoop.yarn.server.api
 public class RMNodeStatusEvent extends RMNodeEvent {
 
   private final NodeHealthStatus nodeHealthStatus;
-  private List<ContainerStatus> containersCollection;
+  private final List<ContainerStatus> containersCollection;
   private final HeartbeatResponse latestResponse;
+  private final List<ApplicationId> keepAliveAppIds;
 
   public RMNodeStatusEvent(NodeId nodeId, NodeHealthStatus nodeHealthStatus,
-      List<ContainerStatus> collection,
+      List<ContainerStatus> collection, List<ApplicationId> keepAliveAppIds,
       HeartbeatResponse latestResponse) {
     super(nodeId, RMNodeEventType.STATUS_UPDATE);
     this.nodeHealthStatus = nodeHealthStatus;
     this.containersCollection = collection;
+    this.keepAliveAppIds = keepAliveAppIds;
     this.latestResponse = latestResponse;
   }
 
@@ -51,4 +54,8 @@ public class RMNodeStatusEvent extends R
   public HeartbeatResponse getLatestResponse() {
     return this.latestResponse;
   }
-}
+  
+  public List<ApplicationId> getKeepAliveAppIds() {
+    return this.keepAliveAppIds;
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java Wed Dec 14 19:55:22 2011
@@ -20,14 +20,19 @@ package org.apache.hadoop.yarn.server.re
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
 import org.apache.commons.logging.Log;
@@ -40,6 +45,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.service.AbstractService;
 
 /**
@@ -65,7 +71,16 @@ public class DelegationTokenRenewer exte
   // appId=>List<tokens>
   private Set<DelegationTokenToRenew> delegationTokens = 
     Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
+  
+  private final ConcurrentMap<ApplicationId, Long> delayedRemovalMap =
+      new ConcurrentHashMap<ApplicationId, Long>();
 
+  private long tokenRemovalDelayMs;
+  
+  private Thread delayedRemovalThread;
+  
+  private boolean tokenKeepAliveEnabled;
+  
   public DelegationTokenRenewer() {
     super(DelegationTokenRenewer.class.getName());
   }
@@ -73,6 +88,12 @@ public class DelegationTokenRenewer exte
   @Override
   public synchronized void init(Configuration conf) {
     super.init(conf);
+    this.tokenKeepAliveEnabled =
+        conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+            YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
+    this.tokenRemovalDelayMs =
+        conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
+            YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
   }
 
   @Override
@@ -81,6 +102,12 @@ public class DelegationTokenRenewer exte
     
     dtCancelThread.start();
     renewalTimer = new Timer(true);
+    if (tokenKeepAliveEnabled) {
+      delayedRemovalThread =
+          new Thread(new DelayedTokenRemovalRunnable(getConfig()),
+              "DelayedTokenCanceller");
+      delayedRemovalThread.start();
+    }
   }
 
   @Override
@@ -94,6 +121,14 @@ public class DelegationTokenRenewer exte
     } catch (InterruptedException e) {
       e.printStackTrace();
     }
+    if (tokenKeepAliveEnabled && delayedRemovalThread != null) {
+      delayedRemovalThread.interrupt();
+      try {
+        delayedRemovalThread.join(1000);
+      } catch (InterruptedException e) {
+        LOG.info("Interrupted while joining on delayed removal thread.", e);
+      }
+    }
     
     super.stop();
   }
@@ -343,12 +378,38 @@ public class DelegationTokenRenewer exte
     if(t.timerTask!=null)
       t.timerTask.cancel();
   }
-  
+
   /**
    * Removing delegation token for completed applications.
    * @param applicationId completed application
    */
-  public void removeApplication(ApplicationId applicationId) {
+  public void applicationFinished(ApplicationId applicationId) {
+    if (!tokenKeepAliveEnabled) {
+      removeApplicationFromRenewal(applicationId);
+    } else {
+      delayedRemovalMap.put(applicationId, System.currentTimeMillis()
+          + tokenRemovalDelayMs);
+    }
+  }
+
+  /**
+   * Add a list of applications to the keep alive list. If an appId already
+   * exists, update it's keep-alive time.
+   * 
+   * @param appIds
+   *          the list of applicationIds to be kept alive.
+   * 
+   */
+  public void updateKeepAliveApplications(List<ApplicationId> appIds) {
+    if (tokenKeepAliveEnabled && appIds != null && appIds.size() > 0) {
+      for (ApplicationId appId : appIds) {
+        delayedRemovalMap.put(appId, System.currentTimeMillis()
+            + tokenRemovalDelayMs);
+      }
+    }
+  }
+
+  private void removeApplicationFromRenewal(ApplicationId applicationId) {
     synchronized (delegationTokens) {
       Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
       while(it.hasNext()) {
@@ -371,4 +432,50 @@ public class DelegationTokenRenewer exte
       }
     }
   }
+
+  /**
+   * Takes care of cancelling app delegation tokens after the configured
+   * cancellation delay, taking into consideration keep-alive requests.
+   * 
+   */
+  private class DelayedTokenRemovalRunnable implements Runnable {
+
+    private long waitTimeMs;
+
+    DelayedTokenRemovalRunnable(Configuration conf) {
+      waitTimeMs =
+          conf.getLong(
+              YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
+              YarnConfiguration.DEFAULT_RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS);
+    }
+
+    @Override
+    public void run() {
+      List<ApplicationId> toCancel = new ArrayList<ApplicationId>();
+      while (!Thread.currentThread().isInterrupted()) {
+        Iterator<Entry<ApplicationId, Long>> it =
+            delayedRemovalMap.entrySet().iterator();
+        toCancel.clear();
+        while (it.hasNext()) {
+          Entry<ApplicationId, Long> e = it.next();
+          if (e.getValue() < System.currentTimeMillis()) {
+            toCancel.add(e.getKey());
+          }
+        }
+        for (ApplicationId appId : toCancel) {
+          removeApplicationFromRenewal(appId);
+          delayedRemovalMap.remove(appId);
+        }
+        synchronized (this) {
+          try {
+            wait(waitTimeMs);
+          } catch (InterruptedException e) {
+            LOG.info("Delayed Deletion Thread Interrupted. Shutting it down");
+            return;
+          }
+        }
+      }
+    }
+  }
+  
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java Wed Dec 14 19:55:22 2011
@@ -20,12 +20,12 @@ package org.apache.hadoop.yarn.server.re
 
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Collections;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -44,6 +44,7 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.junit.After;
 import org.junit.Before;
@@ -328,7 +329,7 @@ public class TestDelegationTokenRenewer 
 
     ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
     delegationTokenRenewer.addApplication(applicationId_1, ts, true);
-    delegationTokenRenewer.removeApplication(applicationId_1);
+    delegationTokenRenewer.applicationFinished(applicationId_1);
     
     numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
     try {
@@ -343,7 +344,7 @@ public class TestDelegationTokenRenewer 
     // also renewing of the cancelled token should fail
     try {
       token4.renew(conf);
-      assertTrue("Renewal of canceled token didn't fail", false);
+      fail("Renewal of cancelled token should have failed");
     } catch (InvalidToken ite) {
       //expected
     }
@@ -377,7 +378,7 @@ public class TestDelegationTokenRenewer 
 
     ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
     delegationTokenRenewer.addApplication(applicationId_1, ts, false);
-    delegationTokenRenewer.removeApplication(applicationId_1);
+    delegationTokenRenewer.applicationFinished(applicationId_1);
     
     int numberOfExpectedRenewals = Renewer.counter; // number of renewals so far
     try {
@@ -393,4 +394,123 @@ public class TestDelegationTokenRenewer 
     // been canceled
     token1.renew(conf);
   }
+  
+  /**
+   * Basic idea of the test:
+   * 0. Setup token KEEP_ALIVE
+   * 1. create tokens.
+   * 2. register them for renewal - to be cancelled on app complete
+   * 3. Complete app.
+   * 4. Verify token is alive within the KEEP_ALIVE time
+   * 5. Verify token has been cancelled after the KEEP_ALIVE_TIME
+   * @throws IOException
+   * @throws URISyntaxException
+   */
+  @Test
+  public void testDTKeepAlive1 () throws Exception {
+    DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
+    Configuration lconf = new Configuration(conf);
+    lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+    //Keep tokens alive for 6 seconds.
+    lconf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 6000l);
+    //Try removing tokens every second.
+    lconf.setLong(
+        YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
+        1000l);
+    localDtr.init(lconf);
+    localDtr.start();
+    
+    MyFS dfs = (MyFS)FileSystem.get(lconf);
+    LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+lconf.hashCode());
+    
+    Credentials ts = new Credentials();
+    // get the delegation tokens
+    MyToken token1 = dfs.getDelegationToken(new Text("user1"));
+
+    String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0";
+    ts.addToken(new Text(nn1), token1);
+
+    // register the tokens for renewal
+    ApplicationId applicationId_0 =  BuilderUtils.newApplicationId(0, 0);
+    localDtr.addApplication(applicationId_0, ts, true);
+    localDtr.applicationFinished(applicationId_0);
+ 
+    Thread.sleep(3000l);
+
+    //Token should still be around. Renewal should not fail.
+    token1.renew(lconf);
+
+    //Allow the keepalive time to run out
+    Thread.sleep(6000l);
+
+    //The token should have been cancelled at this point. Renewal will fail.
+    try {
+      token1.renew(lconf);
+      fail("Renewal of cancelled token should have failed");
+    } catch (InvalidToken ite) {}
+  }
+
+  /**
+   * Basic idea of the test:
+   * 0. Setup token KEEP_ALIVE
+   * 1. create tokens.
+   * 2. register them for renewal - to be cancelled on app complete
+   * 3. Complete app.
+   * 4. Verify token is alive within the KEEP_ALIVE time
+   * 5. Send an explicity KEEP_ALIVE_REQUEST
+   * 6. Verify token KEEP_ALIVE time is renewed.
+   * 7. Verify token has been cancelled after the renewed KEEP_ALIVE_TIME.
+   * @throws IOException
+   * @throws URISyntaxException
+   */
+  @Test
+  public void testDTKeepAlive2() throws Exception {
+    DelegationTokenRenewer localDtr = new DelegationTokenRenewer();
+    Configuration lconf = new Configuration(conf);
+    lconf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+    //Keep tokens alive for 6 seconds.
+    lconf.setLong(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS, 6000l);
+    //Try removing tokens every second.
+    lconf.setLong(
+        YarnConfiguration.RM_DELAYED_DELEGATION_TOKEN_REMOVAL_INTERVAL_MS,
+        1000l);
+    localDtr.init(lconf);
+    localDtr.start();
+    
+    MyFS dfs = (MyFS)FileSystem.get(lconf);
+    LOG.info("dfs="+(Object)dfs.hashCode() + ";conf="+lconf.hashCode());
+
+    Credentials ts = new Credentials();
+    // get the delegation tokens
+    MyToken token1 = dfs.getDelegationToken(new Text("user1"));
+    
+    String nn1 = DelegationTokenRenewer.SCHEME + "://host1:0";
+    ts.addToken(new Text(nn1), token1);
+
+    // register the tokens for renewal
+    ApplicationId applicationId_0 =  BuilderUtils.newApplicationId(0, 0);
+    localDtr.addApplication(applicationId_0, ts, true);
+    localDtr.applicationFinished(applicationId_0);
+
+    Thread.sleep(4000l);
+
+    //Send another keep alive.
+    localDtr.updateKeepAliveApplications(Collections
+        .singletonList(applicationId_0));
+    //Renewal should not fail.
+    token1.renew(lconf);
+
+    //Token should be around after this. 
+    Thread.sleep(4500l);
+    //Renewal should not fail. - ~1.5 seconds for keepalive timeout.
+    token1.renew(lconf);
+
+    //Allow the keepalive time to run out
+    Thread.sleep(3000l);
+    //The token should have been cancelled at this point. Renewal will fail.
+    try {
+      token1.renew(lconf);
+      fail("Renewal of cancelled token should have failed");
+    } catch (InvalidToken ite) {}
+  }
 }

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServices.java Wed Dec 14 19:55:22 2011
@@ -395,7 +395,7 @@ public class TestRMWebServices extends J
     nodeHealth.setHealthReport("test health report");
     nodeHealth.setIsNodeHealthy(false);
     node.handle(new RMNodeStatusEvent(nm1.getNodeId(), nodeHealth,
-        new ArrayList<ContainerStatus>(), null));
+        new ArrayList<ContainerStatus>(), null, null));
     rm.NMwaitForState(nm1.getNodeId(), RMNodeState.UNHEALTHY);
 
     JSONObject json = r.path("ws").path("v1").path("cluster").path("nodes")

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm?rev=1214429&r1=1214428&r2=1214429&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/ClusterSetup.apt.vm Wed Dec 14 19:55:22 2011
@@ -194,6 +194,10 @@ Hadoop MapReduce Next Generation - Clust
 | | | Defaults to special value of <<*>> which means <anyone>. |
 | | | Special value of just <space> means no one has access. |
 *-------------------------+-------------------------+------------------------+
+| <<<yarn.log-aggregation-enable>>> | | |
+| | <false> | |
+| | | Configuration to enable or disable log aggregation |
+*-------------------------+-------------------------+------------------------+
 
 
         * Configurations for ResourceManager:
@@ -260,10 +264,6 @@ Hadoop MapReduce Next Generation - Clust
 | | are written. | |
 | | | Multiple paths help spread disk i/o. |
 *-------------------------+-------------------------+------------------------+
-| <<<yarn.nodemanager.log-aggregation-enable>>> | | |
-| | <false> | |
-| | | Configuration to enable or disable log aggregation |
-*-------------------------+-------------------------+------------------------+
 | <<<yarn.nodemanager.log.retain-seconds>>> | | |
 | | <10800> | |
 | | | Default time (in seconds) to retain log files on the NodeManager |