You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/10/27 23:50:25 UTC

[1/2] YARN-2704. Changed ResourceManager to optionally obtain tokens itself for the sake of localization and log-aggregation for long-running services. Contributed by Jian He.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 5b1dfe78b -> a16d022ca


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
index f65fcdc..b824df7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/security/TestDelegationTokenRenewer.java
@@ -38,6 +38,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -54,6 +55,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -61,6 +63,7 @@ import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenRenewer;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.protocolrecords.SubmitApplicationRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
@@ -74,11 +77,16 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AsyncDispatcher;
 import org.apache.hadoop.yarn.event.Event;
 import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.resourcemanager.ClientRMService;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
 import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -88,16 +96,18 @@ import org.junit.Test;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.base.Supplier;
+
 /**
  * unit test - 
  * tests addition/deletion/cancellation of renewals of delegation tokens
  *
  */
-@SuppressWarnings("rawtypes")
+@SuppressWarnings({"rawtypes", "unchecked"})
 public class TestDelegationTokenRenewer {
   private static final Log LOG = 
       LogFactory.getLog(TestDelegationTokenRenewer.class);
-  private static final Text KIND = new Text("TestDelegationTokenRenewer.Token");
+  private static final Text KIND = new Text("HDFS_DELEGATION_TOKEN");
   
   private static BlockingQueue<Event> eventQueue;
   private static volatile AtomicInteger counter;
@@ -125,6 +135,9 @@ public class TestDelegationTokenRenewer {
 
     @Override
     public long renew(Token<?> t, Configuration conf) throws IOException {
+      if ( !(t instanceof MyToken)) {
+        return DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT;
+      }
       MyToken token = (MyToken)t;
       if(token.isCanceled()) {
         throw new InvalidToken("token has been canceled");
@@ -179,8 +192,10 @@ public class TestDelegationTokenRenewer {
     dispatcher = new AsyncDispatcher(eventQueue);
     Renewer.reset();
     delegationTokenRenewer = createNewDelegationTokenRenewer(conf, counter);
-    RMContext mockContext = mock(RMContext.class);
+    RMContext mockContext =  mock(RMContext.class);
     ClientRMService mockClientRMService = mock(ClientRMService.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
     when(mockContext.getDelegationTokenRenewer()).thenReturn(
         delegationTokenRenewer);
     when(mockContext.getDispatcher()).thenReturn(dispatcher);
@@ -291,8 +306,8 @@ public class TestDelegationTokenRenewer {
     
     MyDelegationTokenSecretManager sm = new MyDelegationTokenSecretManager(
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
-        DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
         DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
+        DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_DEFAULT,
         3600000, null);
     sm.startThreads();
     
@@ -353,7 +368,7 @@ public class TestDelegationTokenRenewer {
     // register the tokens for renewal
     ApplicationId applicationId_0 = 
         BuilderUtils.newApplicationId(0, 0);
-    delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true);
+    delegationTokenRenewer.addApplicationAsync(applicationId_0, ts, true, "user");
     waitForEventsToGetProcessed(delegationTokenRenewer);
 
     // first 3 initial renewals + 1 real
@@ -393,7 +408,7 @@ public class TestDelegationTokenRenewer {
     
 
     ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
-    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true);
+    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, true, "user");
     waitForEventsToGetProcessed(delegationTokenRenewer);
     delegationTokenRenewer.applicationFinished(applicationId_1);
     waitForEventsToGetProcessed(delegationTokenRenewer);
@@ -429,7 +444,7 @@ public class TestDelegationTokenRenewer {
     
     // register the tokens for renewal
     ApplicationId appId =  BuilderUtils.newApplicationId(0, 0);
-    delegationTokenRenewer.addApplicationAsync(appId, ts, true);
+    delegationTokenRenewer.addApplicationAsync(appId, ts, true, "user");
     int waitCnt = 20;
     while (waitCnt-- >0) {
       if (!eventQueue.isEmpty()) {
@@ -473,7 +488,7 @@ public class TestDelegationTokenRenewer {
     
 
     ApplicationId applicationId_1 = BuilderUtils.newApplicationId(0, 1);
-    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false);
+    delegationTokenRenewer.addApplicationAsync(applicationId_1, ts, false, "user");
     waitForEventsToGetProcessed(delegationTokenRenewer);
     delegationTokenRenewer.applicationFinished(applicationId_1);
     waitForEventsToGetProcessed(delegationTokenRenewer);
@@ -516,6 +531,8 @@ public class TestDelegationTokenRenewer {
     DelegationTokenRenewer localDtr =
         createNewDelegationTokenRenewer(lconf, counter);
     RMContext mockContext = mock(RMContext.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
     when(mockContext.getDelegationTokenRenewer()).thenReturn(
@@ -540,7 +557,7 @@ public class TestDelegationTokenRenewer {
 
     // register the tokens for renewal
     ApplicationId applicationId_0 =  BuilderUtils.newApplicationId(0, 0);
-    localDtr.addApplicationAsync(applicationId_0, ts, true);
+    localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
     waitForEventsToGetProcessed(localDtr);
     if (!eventQueue.isEmpty()){
       Event evt = eventQueue.take();
@@ -593,6 +610,8 @@ public class TestDelegationTokenRenewer {
     DelegationTokenRenewer localDtr =
         createNewDelegationTokenRenewer(conf, counter);
     RMContext mockContext = mock(RMContext.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
     when(mockContext.getDelegationTokenRenewer()).thenReturn(
@@ -617,7 +636,7 @@ public class TestDelegationTokenRenewer {
 
     // register the tokens for renewal
     ApplicationId applicationId_0 =  BuilderUtils.newApplicationId(0, 0);
-    localDtr.addApplicationAsync(applicationId_0, ts, true);
+    localDtr.addApplicationAsync(applicationId_0, ts, true, "user");
     localDtr.applicationFinished(applicationId_0);
     waitForEventsToGetProcessed(delegationTokenRenewer);
     //Send another keep alive.
@@ -640,7 +659,7 @@ public class TestDelegationTokenRenewer {
 
   private DelegationTokenRenewer createNewDelegationTokenRenewer(
       Configuration conf, final AtomicInteger counter) {
-    return new DelegationTokenRenewer() {
+    DelegationTokenRenewer renew =  new DelegationTokenRenewer() {
 
       @Override
       protected ThreadPoolExecutor
@@ -664,6 +683,8 @@ public class TestDelegationTokenRenewer {
         return pool;
       }
     };
+    renew.setRMContext(TestUtils.getMockRMContext());
+    return renew;
   }
 
   private void waitForEventsToGetProcessed(DelegationTokenRenewer dtr)
@@ -679,7 +700,12 @@ public class TestDelegationTokenRenewer {
   public void testDTRonAppSubmission()
       throws IOException, InterruptedException, BrokenBarrierException {
     final Credentials credsx = new Credentials();
-    final Token<?> tokenx = mock(Token.class);
+    final Token<DelegationTokenIdentifier> tokenx = mock(Token.class);
+    when(tokenx.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
+    DelegationTokenIdentifier dtId1 = 
+        new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
+          new Text("user1"));
+    when(tokenx.decodeIdentifier()).thenReturn(dtId1);
     credsx.addToken(new Text("token"), tokenx);
     doReturn(true).when(tokenx).isManaged();
     doThrow(new IOException("boom"))
@@ -688,6 +714,8 @@ public class TestDelegationTokenRenewer {
     final DelegationTokenRenewer dtr =
          createNewDelegationTokenRenewer(conf, counter);
     RMContext mockContext = mock(RMContext.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);
     InetSocketAddress sockAddr =
@@ -699,7 +727,7 @@ public class TestDelegationTokenRenewer {
     dtr.start();
 
     try {
-      dtr.addApplicationSync(mock(ApplicationId.class), credsx, false);
+      dtr.addApplicationSync(mock(ApplicationId.class), credsx, false, "user");
       fail("Catch IOException on app submission");
     } catch (IOException e){
       Assert.assertTrue(e.getMessage().contains(tokenx.toString()));
@@ -716,7 +744,12 @@ public class TestDelegationTokenRenewer {
                                                                                
     // this token uses barriers to block during renew                          
     final Credentials creds1 = new Credentials();                              
-    final Token<?> token1 = mock(Token.class);                                 
+    final Token<DelegationTokenIdentifier> token1 = mock(Token.class);    
+    when(token1.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
+    DelegationTokenIdentifier dtId1 = 
+        new DelegationTokenIdentifier(new Text("user1"), new Text("renewer"),
+          new Text("user1"));
+    when(token1.decodeIdentifier()).thenReturn(dtId1);
     creds1.addToken(new Text("token"), token1);                                
     doReturn(true).when(token1).isManaged();                                   
     doAnswer(new Answer<Long>() {                                              
@@ -729,7 +762,9 @@ public class TestDelegationTokenRenewer {
                                                                                
     // this dummy token fakes renewing                                         
     final Credentials creds2 = new Credentials();                              
-    final Token<?> token2 = mock(Token.class);                                 
+    final Token<DelegationTokenIdentifier> token2 = mock(Token.class);           
+    when(token2.getKind()).thenReturn(new Text("HDFS_DELEGATION_TOKEN"));
+    when(token2.decodeIdentifier()).thenReturn(dtId1);
     creds2.addToken(new Text("token"), token2);                                
     doReturn(true).when(token2).isManaged();                                   
     doReturn(Long.MAX_VALUE).when(token2).renew(any(Configuration.class));     
@@ -737,7 +772,9 @@ public class TestDelegationTokenRenewer {
     // fire up the renewer                                                     
     final DelegationTokenRenewer dtr =
         createNewDelegationTokenRenewer(conf, counter);           
-    RMContext mockContext = mock(RMContext.class);                             
+    RMContext mockContext = mock(RMContext.class);
+    when(mockContext.getSystemCredentialsForApps()).thenReturn(
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>());
     ClientRMService mockClientRMService = mock(ClientRMService.class);         
     when(mockContext.getClientRMService()).thenReturn(mockClientRMService);    
     InetSocketAddress sockAddr =                                               
@@ -751,14 +788,14 @@ public class TestDelegationTokenRenewer {
     Thread submitThread = new Thread() {                                       
       @Override                                                                
       public void run() {
-        dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false);
+        dtr.addApplicationAsync(mock(ApplicationId.class), creds1, false, "user");
       }                                                                        
     };                                                                         
     submitThread.start();                                                      
                                                                                
     // wait till 1st submit blocks, then submit another
     startBarrier.await();                           
-    dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false);
+    dtr.addApplicationAsync(mock(ApplicationId.class), creds2, false, "user");
     // signal 1st to complete                                                  
     endBarrier.await();                                                        
     submitThread.join(); 
@@ -793,4 +830,139 @@ public class TestDelegationTokenRenewer {
           "Bad header found in token storage"));
     }
   }
+
+
+  @Test (timeout = 20000)
+  public void testReplaceExpiringDelegationToken() throws Exception {
+    conf.setBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED, true);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+      "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+
+    // create Token1:
+    Text userText1 = new Text("user1");
+    DelegationTokenIdentifier dtId1 =
+        new DelegationTokenIdentifier(userText1, new Text("renewer1"),
+          userText1);
+    // set max date to 0 to simulate an expiring token;
+    dtId1.setMaxDate(0);
+    final Token<DelegationTokenIdentifier> token1 =
+        new Token<DelegationTokenIdentifier>(dtId1.getBytes(),
+          "password1".getBytes(), dtId1.getKind(), new Text("service1"));
+
+    // create token2
+    Text userText2 = new Text("user2");
+    DelegationTokenIdentifier dtId2 =
+        new DelegationTokenIdentifier(userText1, new Text("renewer2"),
+          userText2);
+    final Token<DelegationTokenIdentifier> expectedToken =
+        new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
+          "password2".getBytes(), dtId2.getKind(), new Text("service2"));
+
+    final MockRM rm = new TestSecurityMockRM(conf, null) {
+      @Override
+      protected DelegationTokenRenewer createDelegationTokenRenewer() {
+        return new DelegationTokenRenewer() {
+          @Override
+          protected Token<?>[] obtainSystemTokensForUser(String user,
+              final Credentials credentials) throws IOException {
+            credentials.addToken(expectedToken.getService(), expectedToken);
+            return new Token<?>[] { expectedToken };
+          }
+        };
+      }
+    };
+    rm.start();
+    Credentials credentials = new Credentials();
+    credentials.addToken(userText1, token1);
+
+    RMApp app =
+        rm.submitApp(200, "name", "user",
+          new HashMap<ApplicationAccessType, String>(), false, "default", 1,
+          credentials);
+
+    // wait for the initial expiring hdfs token to be removed.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        return !rm.getRMContext().getDelegationTokenRenewer()
+          .getDelegationTokens().contains(token1);
+      }
+    }, 1000, 20000);
+
+    // wait for the new retrieved hdfs token.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        return rm.getRMContext().getDelegationTokenRenewer()
+          .getDelegationTokens().contains(expectedToken);
+      }
+    }, 1000, 20000);
+
+    // check nm can retrieve the token
+    final MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm1.registerNode();
+    NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
+    ByteBuffer tokenBuffer =
+        response.getSystemCredentialsForApps().get(app.getApplicationId());
+    Assert.assertNotNull(tokenBuffer);
+    Credentials appCredentials = new Credentials();
+    DataInputByteBuffer buf = new DataInputByteBuffer();
+    tokenBuffer.rewind();
+    buf.reset(tokenBuffer);
+    appCredentials.readTokenStorageStream(buf);
+    Assert.assertTrue(appCredentials.getAllTokens().contains(expectedToken));
+  }
+
+  // YARN will get the token for the app submitted without the delegation token.
+  @Test
+  public void testAppSubmissionWithoutDelegationToken() throws Exception {
+    // create token2
+    Text userText2 = new Text("user2");
+    DelegationTokenIdentifier dtId2 =
+        new DelegationTokenIdentifier(new Text("user2"), new Text("renewer2"),
+          userText2);
+    final Token<DelegationTokenIdentifier> token2 =
+        new Token<DelegationTokenIdentifier>(dtId2.getBytes(),
+          "password2".getBytes(), dtId2.getKind(), new Text("service2"));
+    final MockRM rm = new TestSecurityMockRM(conf, null) {
+      @Override
+      protected DelegationTokenRenewer createDelegationTokenRenewer() {
+        return new DelegationTokenRenewer() {
+          @Override
+          protected Token<?>[] obtainSystemTokensForUser(String user,
+              final Credentials credentials) throws IOException {
+            credentials.addToken(token2.getService(), token2);
+            return new Token<?>[] { token2 };
+          }
+        };
+      }
+    };
+    rm.start();
+
+    // submit an app without delegationToken
+    RMApp app = rm.submitApp(200);
+
+    // wait for the new retrieved hdfs token.
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      public Boolean get() {
+        return rm.getRMContext().getDelegationTokenRenewer()
+          .getDelegationTokens().contains(token2);
+      }
+    }, 1000, 20000);
+
+    // check nm can retrieve the token
+    final MockNM nm1 =
+        new MockNM("127.0.0.1:1234", 15120, rm.getResourceTrackerService());
+    nm1.registerNode();
+    NodeHeartbeatResponse response = nm1.nodeHeartbeat(true);
+    ByteBuffer tokenBuffer =
+        response.getSystemCredentialsForApps().get(app.getApplicationId());
+    Assert.assertNotNull(tokenBuffer);
+    Credentials appCredentials = new Credentials();
+    DataInputByteBuffer buf = new DataInputByteBuffer();
+    tokenBuffer.rewind();
+    buf.reset(tokenBuffer);
+    appCredentials.readTokenStorageStream(buf);
+    Assert.assertTrue(appCredentials.getAllTokens().contains(token2));
+  }
 }


[2/2] git commit: YARN-2704. Changed ResourceManager to optionally obtain tokens itself for the sake of localization and log-aggregation for long-running services. Contributed by Jian He.

Posted by vi...@apache.org.
YARN-2704. Changed ResourceManager to optionally obtain tokens itself for the sake of localization and log-aggregation for long-running services. Contributed by Jian He.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a16d022c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a16d022c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a16d022c

Branch: refs/heads/trunk
Commit: a16d022ca4313a41425c8e97841c841a2d6f2f54
Parents: 5b1dfe7
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Mon Oct 27 15:49:47 2014 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Mon Oct 27 15:49:47 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   4 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |   4 +
 .../src/main/resources/yarn-default.xml         |  15 ++
 .../protocolrecords/NodeHeartbeatResponse.java  |   9 +
 .../impl/pb/NodeHeartbeatResponsePBImpl.java    |  56 +++-
 .../yarn_server_common_service_protos.proto     |   6 +
 .../protocolrecords/TestProtocolRecords.java    |  42 ++-
 .../hadoop/yarn/server/nodemanager/Context.java |   4 +
 .../yarn/server/nodemanager/NodeManager.java    |  17 ++
 .../nodemanager/NodeStatusUpdaterImpl.java      |  30 +++
 .../containermanager/ContainerManagerImpl.java  |   6 +-
 .../localizer/ResourceLocalizationService.java  |  33 ++-
 .../logaggregation/LogAggregationService.java   |  16 +-
 .../nodemanager/DummyContainerManager.java      |   5 +-
 .../nodemanager/TestNodeStatusUpdater.java      |  20 ++
 .../TestContainerManagerRecovery.java           |   5 +-
 .../TestLocalCacheDirectoryManager.java         |  11 +-
 .../TestResourceLocalizationService.java        |  40 +--
 .../server/resourcemanager/RMAppManager.java    |   6 +-
 .../yarn/server/resourcemanager/RMContext.java  |   3 +
 .../server/resourcemanager/RMContextImpl.java   |   8 +
 .../resourcemanager/ResourceTrackerService.java |  11 +-
 .../security/DelegationTokenRenewer.java        | 266 ++++++++++++++-----
 .../security/TestDelegationTokenRenewer.java    | 208 +++++++++++++--
 24 files changed, 701 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 74b827d..b32132b 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -400,6 +400,10 @@ Release 2.6.0 - UNRELEASED
     YARN-2703. Added logUploadedTime into LogValue for better display. (Xuan Gong
     via zjshen)
 
+    YARN-2704. Changed ResourceManager to optionally obtain tokens itself for the
+    sake of localization and log-aggregation for long-running services. (Jian He
+    via vinodkv)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index d552e9c..d8ed541 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -695,6 +695,10 @@ public class YarnConfiguration extends Configuration {
       RM_PREFIX + "delegation-token-renewer.thread-count";
   public static final int DEFAULT_RM_DELEGATION_TOKEN_RENEWER_THREAD_COUNT = 50;
 
+  public static final String RM_PROXY_USER_PRIVILEGES_ENABLED = RM_PREFIX
+      + "proxy-user-privileges.enabled";
+  public static boolean DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED = false;
+
   /** Whether to enable log aggregation */
   public static final String LOG_AGGREGATION_ENABLED = YARN_PREFIX
       + "log-aggregation-enable";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index e364ba8..d02931f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -554,6 +554,21 @@
   </property>
 
   <property>
+  <description>If true, ResourceManager will have proxy-user privileges.
+    Use case: In a secure cluster, YARN requires the user hdfs delegation-tokens to
+    do localization and log-aggregation on behalf of the user. If this is set to true,
+    ResourceManager is able to request new hdfs delegation tokens on behalf of
+    the user. This is needed by long-running-service, because the hdfs tokens
+    will eventually expire and YARN requires new valid tokens to do localization
+    and log-aggregation. Note that to enable this use case, the corresponding
+    HDFS NameNode has to configure ResourceManager as the proxy-user so that
+    ResourceManager can itself ask for new tokens on behalf of the user when
+    tokens are past their max-life-time.</description>
+    <name>yarn.resourcemanager.proxy-user-privileges.enabled</name>
+    <value>false</value>
+  </property>
+
+  <property>
     <description>Interval for the roll over for the master key used to generate
         application tokens
     </description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 12e1f54..9fb44ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -18,7 +18,9 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
+import java.nio.ByteBuffer;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -58,4 +60,11 @@ public interface NodeHeartbeatResponse {
   String getDiagnosticsMessage();
 
   void setDiagnosticsMessage(String diagnosticsMessage);
+
+  // Credentials (i.e. hdfs tokens) needed by NodeManagers for application
+  // localizations and logAggreations.
+  Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps();
+
+  void setSystemCredentialsForApps(
+      Map<ApplicationId, ByteBuffer> systemCredentials);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index 78979d5..1e91514 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -18,21 +18,26 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
+import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
+import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
@@ -49,6 +54,8 @@ public class NodeHeartbeatResponsePBImpl extends
   private List<ContainerId> containersToCleanup = null;
   private List<ContainerId> containersToBeRemovedFromNM = null;
   private List<ApplicationId> applicationsToCleanup = null;
+  private Map<ApplicationId, ByteBuffer> systemCredentials = null;
+
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
   
@@ -62,7 +69,7 @@ public class NodeHeartbeatResponsePBImpl extends
   }
   
   public NodeHeartbeatResponseProto getProto() {
-      mergeLocalToProto();
+    mergeLocalToProto();
     proto = viaProto ? proto : builder.build();
     viaProto = true;
     return proto;
@@ -86,6 +93,19 @@ public class NodeHeartbeatResponsePBImpl extends
       builder.setNmTokenMasterKey(
           convertToProtoFormat(this.nmTokenMasterKey));
     }
+    if (this.systemCredentials != null) {
+      addSystemCredentialsToProto();
+    }
+  }
+
+  private void addSystemCredentialsToProto() {
+    maybeInitBuilder();
+    builder.clearSystemCredentialsForApps();
+    for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentials.entrySet()) {
+      builder.addSystemCredentialsForApps(SystemCredentialsForAppsProto.newBuilder()
+        .setAppId(convertToProtoFormat(entry.getKey()))
+        .setCredentialsForApp(ProtoUtils.convertToProtoFormat(entry.getValue())));
+    }
   }
 
   private void mergeLocalToProto() {
@@ -387,6 +407,38 @@ public class NodeHeartbeatResponsePBImpl extends
     builder.addAllApplicationsToCleanup(iterable);
   }
 
+
+  @Override
+  public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
+    if (this.systemCredentials != null) {
+      return this.systemCredentials;
+    }
+    initSystemCredentials();
+    return systemCredentials;
+  }
+
+  private void initSystemCredentials() {
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<SystemCredentialsForAppsProto> list = p.getSystemCredentialsForAppsList();
+    this.systemCredentials = new HashMap<ApplicationId, ByteBuffer> ();
+    for (SystemCredentialsForAppsProto c : list) {
+      ApplicationId appId = convertFromProtoFormat(c.getAppId());
+      ByteBuffer byteBuffer = ProtoUtils.convertFromProtoFormat(c.getCredentialsForApp());
+      this.systemCredentials.put(appId, byteBuffer);
+    }
+  }
+
+  @Override
+  public void setSystemCredentialsForApps(
+      Map<ApplicationId, ByteBuffer> systemCredentials) {
+    if (systemCredentials == null || systemCredentials.isEmpty()) {
+      return;
+    }
+    maybeInitBuilder();
+    this.systemCredentials = new HashMap<ApplicationId, ByteBuffer>();
+    this.systemCredentials.putAll(systemCredentials);
+  }
+
   @Override
   public long getNextHeartBeatInterval() {
     NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index d0990fb..f2d01ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -59,6 +59,12 @@ message NodeHeartbeatResponseProto {
   optional int64 nextHeartBeatInterval = 7;
   optional string diagnostics_message = 8;
   repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
+  repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
+}
+
+message SystemCredentialsForAppsProto {
+  optional ApplicationIdProto appId = 1;
+  optional bytes credentialsForApp = 2;
 }
 
 message NMContainerStatusProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
index 7165445..ed902ba 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/api/protocolrecords/TestProtocolRecords.java
@@ -18,9 +18,18 @@
 
 package org.apache.hadoop.yarn.server.api.protocolrecords;
 
+import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -29,10 +38,10 @@ import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
-import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NMContainerStatusPBImpl;
+import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.NodeHeartbeatResponsePBImpl;
 import org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb.RegisterNodeManagerRequestPBImpl;
+import org.apache.hadoop.yarn.util.Records;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -93,4 +102,33 @@ public class TestProtocolRecords {
     Assert.assertEquals(1, requestProto.getRunningApplications().size());
     Assert.assertEquals(appId, requestProto.getRunningApplications().get(0)); 
   }
+
+  @Test
+  public void testNodeHeartBeatResponse() throws IOException {
+    NodeHeartbeatResponse record =
+        Records.newRecord(NodeHeartbeatResponse.class);
+    Map<ApplicationId, ByteBuffer> appCredentials =
+        new HashMap<ApplicationId, ByteBuffer>();
+    Credentials app1Cred = new Credentials();
+
+    Token<DelegationTokenIdentifier> token1 =
+        new Token<DelegationTokenIdentifier>();
+    token1.setKind(new Text("kind1"));
+    app1Cred.addToken(new Text("token1"), token1);
+    Token<DelegationTokenIdentifier> token2 =
+        new Token<DelegationTokenIdentifier>();
+    token2.setKind(new Text("kind2"));
+    app1Cred.addToken(new Text("token2"), token2);
+
+    DataOutputBuffer dob = new DataOutputBuffer();
+    app1Cred.writeTokenStorageToStream(dob);
+    ByteBuffer byteBuffer1 = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1);
+    record.setSystemCredentialsForApps(appCredentials);
+
+    NodeHeartbeatResponse proto =
+        new NodeHeartbeatResponsePBImpl(
+          ((NodeHeartbeatResponsePBImpl) record).getProto());
+    Assert.assertEquals(appCredentials, proto.getSystemCredentialsForApps());
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 956ea33..6e7e2ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -18,8 +18,10 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import java.util.Map;
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -54,6 +56,8 @@ public interface Context {
 
   ConcurrentMap<ApplicationId, Application> getApplications();
 
+  Map<ApplicationId, Credentials> getSystemCredentialsForApps();
+
   ConcurrentMap<ContainerId, Container> getContainers();
 
   NMContainerTokenSecretManager getContainerTokenSecretManager();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 43770c1..22057f4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
@@ -32,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.service.CompositeService;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -313,6 +316,10 @@ public class NodeManager extends CompositeService
     private NodeId nodeId = null;
     protected final ConcurrentMap<ApplicationId, Application> applications =
         new ConcurrentHashMap<ApplicationId, Application>();
+
+    private Map<ApplicationId, Credentials> systemCredentials =
+        new HashMap<ApplicationId, Credentials>();
+
     protected final ConcurrentMap<ContainerId, Container> containers =
         new ConcurrentSkipListMap<ContainerId, Container>();
 
@@ -420,6 +427,16 @@ public class NodeManager extends CompositeService
     public void setDecommissioned(boolean isDecommissioned) {
       this.isDecommissioned = isDecommissioned;
     }
+
+    @Override
+    public Map<ApplicationId, Credentials> getSystemCredentialsForApps() {
+      return systemCredentials;
+    }
+
+    public void setSystemCrendentials(
+        Map<ApplicationId, Credentials> systemCredentials) {
+      this.systemCredentials = systemCredentials;
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index bed58f5..1c3ac5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.nodemanager;
 
 import java.io.IOException;
 import java.net.ConnectException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
@@ -36,7 +37,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.VersionUtil;
@@ -62,6 +65,7 @@ import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 import org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.api.records.NodeStatus;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationState;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
@@ -525,6 +529,25 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
     return this.rmIdentifier;
   }
 
+  private static Map<ApplicationId, Credentials> parseCredentials(
+      Map<ApplicationId, ByteBuffer> systemCredentials) throws IOException {
+    Map<ApplicationId, Credentials> map =
+        new HashMap<ApplicationId, Credentials>();
+    for (Map.Entry<ApplicationId, ByteBuffer> entry : systemCredentials.entrySet()) {
+      Credentials credentials = new Credentials();
+      DataInputByteBuffer buf = new DataInputByteBuffer();
+      ByteBuffer buffer = entry.getValue();
+      buffer.rewind();
+      buf.reset(buffer);
+      credentials.readTokenStorageStream(buf);
+      map.put(entry.getKey(), credentials);
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Retrieved credentials form RM: " + map);
+    }
+    return map;
+  }
+
   protected void startStatusUpdater() {
 
     statusUpdaterRunnable = new Runnable() {
@@ -598,6 +621,13 @@ public class NodeStatusUpdaterImpl extends AbstractService implements
                   new CMgrCompletedAppsEvent(appsToCleanup,
                       CMgrCompletedAppsEvent.Reason.BY_RESOURCEMANAGER));
             }
+
+            Map<ApplicationId, ByteBuffer> systemCredentials =
+                response.getSystemCredentialsForApps();
+            if (systemCredentials != null && !systemCredentials.isEmpty()) {
+              ((NMContext) context)
+                .setSystemCrendentials(parseCredentials(systemCredentials));
+            }
           } catch (ConnectException e) {
             //catch and throw the exception if tried MAX wait time to connect RM
             dispatcher.getEventHandler().handle(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index e8001ff..35b232f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -186,7 +186,7 @@ public class ContainerManagerImpl extends CompositeService implements
     this.metrics = metrics;
 
     rsrcLocalizationSrvc =
-        createResourceLocalizationService(exec, deletionContext);
+        createResourceLocalizationService(exec, deletionContext, context);
     addService(rsrcLocalizationSrvc);
 
     containersLauncher = createContainersLauncher(context, exec);
@@ -362,9 +362,9 @@ public class ContainerManagerImpl extends CompositeService implements
   }
 
   protected ResourceLocalizationService createResourceLocalizationService(
-      ContainerExecutor exec, DeletionService deletionContext) {
+      ContainerExecutor exec, DeletionService deletionContext, Context context) {
     return new ResourceLocalizationService(this.dispatcher, exec,
-        deletionContext, dirsHandler, context.getNMStateStore());
+        deletionContext, dirsHandler, context);
   }
 
   protected ContainersLauncher createContainersLauncher(Context context,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index a6143a2..549d8e7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -83,11 +83,11 @@ import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.ipc.RPCUtil;
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.proto.YarnProtos.LocalResourceProto;
 import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.LocalizedResourceProto;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService.FileDeletionTask;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
@@ -158,6 +158,7 @@ public class ResourceLocalizationService extends CompositeService
   private LocalResourcesTracker publicRsrc;
 
   private LocalDirsHandlerService dirsHandler;
+  private Context nmContext;
 
   /**
    * Map of LocalResourceTrackers keyed by username, for private
@@ -177,7 +178,7 @@ public class ResourceLocalizationService extends CompositeService
 
   public ResourceLocalizationService(Dispatcher dispatcher,
       ContainerExecutor exec, DeletionService delService,
-      LocalDirsHandlerService dirsHandler, NMStateStoreService stateStore) {
+      LocalDirsHandlerService dirsHandler, Context context) {
 
     super(ResourceLocalizationService.class.getName());
     this.exec = exec;
@@ -189,7 +190,8 @@ public class ResourceLocalizationService extends CompositeService
         new ThreadFactoryBuilder()
           .setNameFormat("ResourceLocalizationService Cache Cleanup")
           .build());
-    this.stateStore = stateStore;
+    this.stateStore = context.getNMStateStore();
+    this.nmContext = context;
   }
 
   FileContext getLocalFileContext(Configuration conf) {
@@ -1110,11 +1112,36 @@ public class ResourceLocalizationService extends CompositeService
       }
     }
 
+    private Credentials getSystemCredentialsSentFromRM(
+        LocalizerContext localizerContext) throws IOException {
+      ApplicationId appId =
+          localizerContext.getContainerId().getApplicationAttemptId()
+            .getApplicationId();
+      Credentials systemCredentials =
+          nmContext.getSystemCredentialsForApps().get(appId);
+      if (systemCredentials == null) {
+        return null;
+      }
+      LOG.info("Adding new framework tokens from RM for " + appId);
+      for (Token<?> token : systemCredentials.getAllTokens()) {
+        LOG.info("Adding new application-token for localization: " + token);
+      }
+      return systemCredentials;
+    }
+    
     private void writeCredentials(Path nmPrivateCTokensPath)
         throws IOException {
       DataOutputStream tokenOut = null;
       try {
         Credentials credentials = context.getCredentials();
+        if (UserGroupInformation.isSecurityEnabled()) {
+          Credentials systemCredentials =
+              getSystemCredentialsSentFromRM(context);
+          if (systemCredentials != null) {
+            credentials = systemCredentials;
+          }
+        }
+
         FileContext lfs = getLocalFileContext(getConfig());
         tokenOut =
             lfs.create(nmPrivateCTokensPath, EnumSet.of(CREATE, OVERWRITE));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index 77176b7..cc717d7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -39,9 +39,10 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -60,6 +61,7 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.eve
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerAppStartedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerContainerFinishedEvent;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
+
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
@@ -342,6 +344,18 @@ public class LogAggregationService extends AbstractService implements
       Map<ApplicationAccessType, String> appAcls,
       LogAggregationContext logAggregationContext) {
 
+    if (UserGroupInformation.isSecurityEnabled()) {
+      Credentials systemCredentials =
+          context.getSystemCredentialsForApps().get(appId);
+      if (systemCredentials != null) {
+        LOG.info("Adding new framework tokens from RM for " + appId);
+        for (Token<?> token : systemCredentials.getAllTokens()) {
+          LOG.info("Adding new application-token for log-aggregation: " + token);
+        }
+        credentials = systemCredentials;
+      }
+    }
+
     // Get user's FileSystem credentials
     final UserGroupInformation userUgi =
         UserGroupInformation.createRemoteUser(user);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
index 5753fb8..f872a55 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
@@ -54,7 +54,6 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEvent;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
-import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 
 public class DummyContainerManager extends ContainerManagerImpl {
@@ -74,9 +73,9 @@ public class DummyContainerManager extends ContainerManagerImpl {
   @Override
   @SuppressWarnings("unchecked")
   protected ResourceLocalizationService createResourceLocalizationService(
-      ContainerExecutor exec, DeletionService deletionContext) {
+      ContainerExecutor exec, DeletionService deletionContext, Context context) {
     return new ResourceLocalizationService(super.dispatcher, exec,
-        deletionContext, super.dirsHandler, new NMNullStateStoreService()) {
+        deletionContext, super.dirsHandler, context) {
       @Override
       public void handle(LocalizationEvent event) {
         switch (event.getType()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 7593ce6..5c2dd2c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -44,10 +44,14 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
 import org.apache.hadoop.service.Service.STATE;
 import org.apache.hadoop.service.ServiceOperations;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -561,6 +565,7 @@ public class TestNodeStatusUpdater {
 
   // Test NodeStatusUpdater sends the right container statuses each time it
   // heart beats.
+  private Credentials expectedCredentials = new Credentials();
   private class MyResourceTracker4 implements ResourceTracker {
 
     public NodeAction registerNodeAction = NodeAction.NORMAL;
@@ -576,6 +581,11 @@ public class TestNodeStatusUpdater {
         createContainerStatus(5, ContainerState.COMPLETE);
 
     public MyResourceTracker4(Context context) {
+      // create app Credentials
+      org.apache.hadoop.security.token.Token<DelegationTokenIdentifier> token1 =
+          new org.apache.hadoop.security.token.Token<DelegationTokenIdentifier>();
+      token1.setKind(new Text("kind1"));
+      expectedCredentials.addToken(new Text("token1"), token1);
       this.context = context;
     }
 
@@ -694,6 +704,14 @@ public class TestNodeStatusUpdater {
           YarnServerBuilderUtils.newNodeHeartbeatResponse(heartBeatID,
             heartBeatNodeAction, null, null, null, null, 1000L);
       nhResponse.addContainersToBeRemovedFromNM(finishedContainersPulledByAM);
+      Map<ApplicationId, ByteBuffer> appCredentials =
+          new HashMap<ApplicationId, ByteBuffer>();
+      DataOutputBuffer dob = new DataOutputBuffer();
+      expectedCredentials.writeTokenStorageToStream(dob);
+      ByteBuffer byteBuffer1 =
+          ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+      appCredentials.put(ApplicationId.newInstance(1234, 1), byteBuffer1);
+      nhResponse.setSystemCredentialsForApps(appCredentials);
       return nhResponse;
     }
   }
@@ -1293,6 +1311,8 @@ public class TestNodeStatusUpdater {
     if(assertionFailedInThread.get()) {
       Assert.fail("ContainerStatus Backup failed");
     }
+    Assert.assertNotNull(nm.getNMContext().getSystemCredentialsForApps()
+      .get(ApplicationId.newInstance(1234, 1)).getToken(new Text("token1")));
     nm.stop();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
index 7850a1c..007fc36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManagerRecovery.java
@@ -278,8 +278,7 @@ public class TestContainerManagerRecovery {
   private ContainerManagerImpl createContainerManager(Context context) {
     final LogHandler logHandler = mock(LogHandler.class);
     final ResourceLocalizationService rsrcSrv =
-        new ResourceLocalizationService(null, null, null, null,
-            context.getNMStateStore()) {
+        new ResourceLocalizationService(null, null, null, null, context) {
           @Override
           public void serviceInit(Configuration conf) throws Exception {
           }
@@ -320,7 +319,7 @@ public class TestContainerManagerRecovery {
 
           @Override
           protected ResourceLocalizationService createResourceLocalizationService(
-              ContainerExecutor exec, DeletionService deletionContext) {
+              ContainerExecutor exec, DeletionService deletionContext, Context context) {
             return rsrcSrv;
           }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
index 503ce8c..9e08b7f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestLocalCacheDirectoryManager.java
@@ -23,7 +23,12 @@ import org.junit.Assert;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.LocalCacheDirectoryManager.Directory;
+import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.junit.Test;
 
 public class TestLocalCacheDirectoryManager {
@@ -73,8 +78,12 @@ public class TestLocalCacheDirectoryManager {
     YarnConfiguration conf = new YarnConfiguration();
     conf.set(YarnConfiguration.NM_LOCAL_CACHE_MAX_FILES_PER_DIRECTORY, "1");
     Exception e = null;
+    NMContext nmContext =
+        new NMContext(new NMContainerTokenSecretManager(conf),
+          new NMTokenSecretManagerInNM(), null,
+          new ApplicationACLsManager(conf), new NMNullStateStoreService());
     ResourceLocalizationService service =
-        new ResourceLocalizationService(null, null, null, null, null);
+        new ResourceLocalizationService(null, null, null, null, nmContext);
     try {
       service.init(conf);
     } catch (Exception e1) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
index cf1e9fa..bf36651 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/TestResourceLocalizationService.java
@@ -105,6 +105,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.nodemanager.ContainerExecutor;
 import org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.api.ResourceLocalizationSpec;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalResourceStatus;
 import org.apache.hadoop.yarn.server.nodemanager.api.protocolrecords.LocalizerAction;
@@ -138,6 +139,9 @@ import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMNullStateStoreService;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.server.nodemanager.security.NMTokenSecretManagerInNM;
+import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
@@ -159,7 +163,7 @@ public class TestResourceLocalizationService {
   private Configuration conf;
   private AbstractFileSystem spylfs;
   private FileContext lfs;
-  
+  private NMContext nmContext;
   @BeforeClass
   public static void setupClass() {
     mockServer = mock(Server.class);
@@ -174,6 +178,9 @@ public class TestResourceLocalizationService {
 
     String logDir = lfs.makeQualified(new Path(basedir, "logdir ")).toString();
     conf.set(YarnConfiguration.NM_LOG_DIRS, logDir);
+    nmContext = new NMContext(new NMContainerTokenSecretManager(
+      conf), new NMTokenSecretManagerInNM(), null,
+      new ApplicationACLsManager(conf), new NMNullStateStoreService());
   }
 
   @After
@@ -206,8 +213,7 @@ public class TestResourceLocalizationService {
 
     ResourceLocalizationService locService =
       spy(new ResourceLocalizationService(dispatcher, exec, delService,
-                                          diskhandler,
-                                          new NMNullStateStoreService()));
+                                          diskhandler, nmContext));
     doReturn(lfs)
       .when(locService).getLocalFileContext(isA(Configuration.class));
     try {
@@ -268,8 +274,7 @@ public class TestResourceLocalizationService {
 
     ResourceLocalizationService locService =
         spy(new ResourceLocalizationService(dispatcher, exec, delService,
-            diskhandler,
-            nmStateStoreService));
+            diskhandler,nmContext));
     doReturn(lfs)
         .when(locService).getLocalFileContext(isA(Configuration.class));
     try {
@@ -340,8 +345,7 @@ public class TestResourceLocalizationService {
 
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
-                                      dirsHandler,
-                                      new NMNullStateStoreService());
+                                      dirsHandler, nmContext);
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(
@@ -751,8 +755,7 @@ public class TestResourceLocalizationService {
 
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
-                                      dirsHandler,
-                                      new NMNullStateStoreService());
+                                      dirsHandler, nmContext);
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(lfs).when(spyService).getLocalFileContext(isA(Configuration.class));
@@ -965,8 +968,7 @@ public class TestResourceLocalizationService {
     try {
       ResourceLocalizationService rawService =
           new ResourceLocalizationService(dispatcher, exec, delService,
-                                        dirsHandler,
-                                        new NMNullStateStoreService());
+                                        dirsHandler, nmContext);
       ResourceLocalizationService spyService = spy(rawService);
       doReturn(mockServer).when(spyService).createServer();
       doReturn(lfs).when(spyService).getLocalFileContext(
@@ -1075,7 +1077,7 @@ public class TestResourceLocalizationService {
     try {
       ResourceLocalizationService rawService =
           new ResourceLocalizationService(dispatcher, exec, delService,
-            dirsHandlerSpy, new NMNullStateStoreService());
+            dirsHandlerSpy, nmContext);
       ResourceLocalizationService spyService = spy(rawService);
       doReturn(mockServer).when(spyService).createServer();
       doReturn(lfs).when(spyService).getLocalFileContext(
@@ -1188,7 +1190,7 @@ public class TestResourceLocalizationService {
 
       ResourceLocalizationService rls =
           new ResourceLocalizationService(dispatcher1, exec, delService,
-            localDirHandler, new NMNullStateStoreService());
+            localDirHandler, nmContext);
       dispatcher1.register(LocalizationEventType.class, rls);
       rls.init(conf);
 
@@ -1341,7 +1343,7 @@ public class TestResourceLocalizationService {
 
       ResourceLocalizationService rls =
           new ResourceLocalizationService(dispatcher1, exec, delService,
-            localDirHandler, new NMNullStateStoreService());
+            localDirHandler, nmContext);
       dispatcher1.register(LocalizationEventType.class, rls);
       rls.init(conf);
 
@@ -1507,7 +1509,7 @@ public class TestResourceLocalizationService {
       // it as otherwise it will remove requests from pending queue.
       ResourceLocalizationService rawService =
           new ResourceLocalizationService(dispatcher1, exec, delService,
-            dirsHandler, new NMNullStateStoreService());
+            dirsHandler, nmContext);
       ResourceLocalizationService spyService = spy(rawService);
       dispatcher1.register(LocalizationEventType.class, spyService);
       spyService.init(conf);
@@ -1795,9 +1797,13 @@ public class TestResourceLocalizationService {
     ContainerExecutor exec = mock(ContainerExecutor.class);
     LocalizerTracker mockLocalizerTracker = mock(LocalizerTracker.class);
     DeletionService delService = mock(DeletionService.class);
+    NMContext nmContext =
+        new NMContext(new NMContainerTokenSecretManager(conf),
+          new NMTokenSecretManagerInNM(), null,
+          new ApplicationACLsManager(conf), stateStore);
     ResourceLocalizationService rawService =
       new ResourceLocalizationService(dispatcher, exec, delService,
-                                      dirsHandler, stateStore);
+                                      dirsHandler, nmContext);
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(mockLocalizerTracker).when(spyService).createLocalizerTracker(
@@ -1861,7 +1867,7 @@ public class TestResourceLocalizationService {
     // setup mocks
     ResourceLocalizationService rawService =
         new ResourceLocalizationService(dispatcher, exec, delService,
-          mockDirsHandler, new NMNullStateStoreService());
+          mockDirsHandler, nmContext);
     ResourceLocalizationService spyService = spy(rawService);
     doReturn(mockServer).when(spyService).createServer();
     doReturn(mockLocallilzerTracker).when(spyService).createLocalizerTracker(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 6e1b925..63333b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -278,7 +278,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
       try {
         credentials = parseCredentials(submissionContext);
         this.rmContext.getDelegationTokenRenewer().addApplicationAsync(appId,
-          credentials, submissionContext.getCancelTokensWhenComplete());
+          credentials, submissionContext.getCancelTokensWhenComplete(),
+          application.getUser());
       } catch (Exception e) {
         LOG.warn("Unable to parse credentials.", e);
         // Sending APP_REJECTED is fine, since we assume that the
@@ -325,7 +326,8 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
         credentials = parseCredentials(appContext);
         // synchronously renew delegation token on recovery.
         rmContext.getDelegationTokenRenewer().addApplicationSync(appId,
-          credentials, appContext.getCancelTokensWhenComplete());
+          credentials, appContext.getCancelTokensWhenComplete(),
+          application.getUser());
         application.handle(new RMAppEvent(appId, RMAppEventType.RECOVER));
       } catch (Exception e) {
         LOG.warn("Unable to parse and renew delegation tokens.", e);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index e824634..56984e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
@@ -57,6 +58,8 @@ public interface RMContext {
 
   ConcurrentMap<ApplicationId, RMApp> getRMApps();
   
+  ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps();
+
   ConcurrentMap<String, RMNode> getInactiveRMNodes();
 
   ConcurrentMap<NodeId, RMNode> getRMNodes();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 076c3dd..7c1db3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager;
 
+import java.nio.ByteBuffer;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
@@ -67,6 +68,9 @@ public class RMContextImpl implements RMContext {
   private final ConcurrentMap<String, RMNode> inactiveNodes
     = new ConcurrentHashMap<String, RMNode>();
 
+  private final ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
+      new ConcurrentHashMap<ApplicationId, ByteBuffer>();
+
   private boolean isHAEnabled;
   private boolean isWorkPreservingRecoveryEnabled;
   private HAServiceState haServiceState =
@@ -444,4 +448,8 @@ public class RMContextImpl implements RMContext {
   public void setSystemClock(Clock clock) {
     this.systemClock = clock;
   }
+
+  public ConcurrentMap<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
+    return systemCredentials;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index f5583bc..4beb895 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,6 +33,7 @@ import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.VersionUtil;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
@@ -385,7 +388,7 @@ public class ResourceTrackerService extends AbstractService implements
     if (remoteNodeStatus.getResponseId() + 1 == lastNodeHeartbeatResponse
         .getResponseId()) {
       LOG.info("Received duplicate heartbeat from node "
-          + rmNode.getNodeAddress());
+          + rmNode.getNodeAddress()+ " responseId=" + remoteNodeStatus.getResponseId());
       return lastNodeHeartbeatResponse;
     } else if (remoteNodeStatus.getResponseId() + 1 < lastNodeHeartbeatResponse
         .getResponseId()) {
@@ -410,6 +413,12 @@ public class ResourceTrackerService extends AbstractService implements
 
     populateKeys(request, nodeHeartBeatResponse);
 
+    ConcurrentMap<ApplicationId, ByteBuffer> systemCredentials =
+        rmContext.getSystemCredentialsForApps();
+    if (!systemCredentials.isEmpty()) {
+      nodeHeartBeatResponse.setSystemCredentialsForApps(systemCredentials);
+    }
+
     // 4. Send status to RMNode, saving the latest response.
     this.rmContext.getDispatcher().getEventHandler().handle(
         new RMNodeStatusEvent(nodeId, remoteNodeStatus.getNodeHealthStatus(),

http://git-wip-us.apache.org/repos/asf/hadoop/blob/a16d022c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
index e0c3224..2dc331e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/DelegationTokenRenewer.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.security.PrivilegedAction;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -45,14 +47,20 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.AbstractEvent;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -82,12 +90,10 @@ public class DelegationTokenRenewer extends AbstractService {
   private DelegationTokenCancelThread dtCancelThread =
     new DelegationTokenCancelThread();
   private ThreadPoolExecutor renewerService;
-  
-  // managing the list of tokens using Map
-  // appId=>List<tokens>
-  private Set<DelegationTokenToRenew> delegationTokens = 
-    Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>());
-  
+
+  private ConcurrentMap<ApplicationId, Set<DelegationTokenToRenew>> appTokens =
+      new ConcurrentHashMap<ApplicationId, Set<DelegationTokenToRenew>>();
+
   private final ConcurrentMap<ApplicationId, Long> delayedRemovalMap =
       new ConcurrentHashMap<ApplicationId, Long>();
 
@@ -99,20 +105,33 @@ public class DelegationTokenRenewer extends AbstractService {
   private LinkedBlockingQueue<DelegationTokenRenewerEvent> pendingEventQueue;
   
   private boolean tokenKeepAliveEnabled;
-  
+  private boolean hasProxyUserPrivileges;
+  private long credentialsValidTimeRemaining;
+
+  // this config is supposedly not used by end-users.
+  public static final String RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
+      YarnConfiguration.RM_PREFIX + "system-credentials.valid-time-remaining";
+  public static final long DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING =
+      10800000; // 3h
+
   public DelegationTokenRenewer() {
     super(DelegationTokenRenewer.class.getName());
   }
 
   @Override
-  protected synchronized void serviceInit(Configuration conf) throws Exception {
+  protected void serviceInit(Configuration conf) throws Exception {
+    this.hasProxyUserPrivileges =
+        conf.getBoolean(YarnConfiguration.RM_PROXY_USER_PRIVILEGES_ENABLED,
+          YarnConfiguration.DEFAULT_RM_PROXY_USER_PRIVILEGES_ENABLED);
     this.tokenKeepAliveEnabled =
         conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
             YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED);
     this.tokenRemovalDelayMs =
         conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
             YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
-
+    this.credentialsValidTimeRemaining =
+        conf.getLong(RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING,
+          DEFAULT_RM_SYSTEM_CREDENTIALS_VALID_TIME_REMAINING);
     setLocalSecretManagerAndServiceAddr();
     renewerService = createNewThreadPoolService(conf);
     pendingEventQueue = new LinkedBlockingQueue<DelegationTokenRenewerEvent>();
@@ -182,7 +201,7 @@ public class DelegationTokenRenewer extends AbstractService {
     if (renewalTimer != null) {
       renewalTimer.cancel();
     }
-    delegationTokens.clear();
+    appTokens.clear();
     this.renewerService.shutdown();
     dtCancelThread.interrupt();
     try {
@@ -212,22 +231,28 @@ public class DelegationTokenRenewer extends AbstractService {
     public long expirationDate;
     public TimerTask timerTask;
     public final boolean shouldCancelAtEnd;
-    
-    public DelegationTokenToRenew(
-        ApplicationId jId, Token<?> token, 
-        Configuration conf, long expirationDate, boolean shouldCancelAtEnd) {
+    public long maxDate;
+    public String user;
+
+    public DelegationTokenToRenew(ApplicationId jId, Token<?> token,
+        Configuration conf, long expirationDate, boolean shouldCancelAtEnd,
+        String user) {
       this.token = token;
+      this.user = user;
+      if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
+        try {
+          AbstractDelegationTokenIdentifier identifier =
+              (AbstractDelegationTokenIdentifier) token.decodeIdentifier();
+          maxDate = identifier.getMaxDate();
+        } catch (IOException e) {
+          throw new YarnRuntimeException(e);
+        }
+      }
       this.applicationId = jId;
       this.conf = conf;
       this.expirationDate = expirationDate;
       this.timerTask = null;
       this.shouldCancelAtEnd = shouldCancelAtEnd;
-      if (this.token==null || this.applicationId==null || this.conf==null) {
-        throw new IllegalArgumentException("Invalid params to renew token" +
-            ";token=" + this.token +
-            ";appId=" + this.applicationId +
-            ";conf=" + this.conf);
-      }
     }
     
     public void setTimerTask(TimerTask tTask) {
@@ -317,16 +342,14 @@ public class DelegationTokenRenewer extends AbstractService {
       }
     }
   }
-  //adding token
-  private void addTokenToList(DelegationTokenToRenew t) {
-    delegationTokens.add(t);
-  }
 
   @VisibleForTesting
   public Set<Token<?>> getDelegationTokens() {
     Set<Token<?>> tokens = new HashSet<Token<?>>();
-    for(DelegationTokenToRenew delegationToken : delegationTokens) {
-      tokens.add(delegationToken.token);
+    for (Set<DelegationTokenToRenew> tokenList : appTokens.values()) {
+      for (DelegationTokenToRenew token : tokenList) {
+        tokens.add(token.token);
+      }
     }
     return tokens;
   }
@@ -337,25 +360,28 @@ public class DelegationTokenRenewer extends AbstractService {
    * @param ts tokens
    * @param shouldCancelAtEnd true if tokens should be canceled when the app is
    * done else false. 
+   * @param user user
    * @throws IOException
    */
   public void addApplicationAsync(ApplicationId applicationId, Credentials ts,
-      boolean shouldCancelAtEnd) {
+      boolean shouldCancelAtEnd, String user) {
     processDelegationTokenRenewerEvent(new DelegationTokenRenewerAppSubmitEvent(
-      applicationId, ts, shouldCancelAtEnd));
+      applicationId, ts, shouldCancelAtEnd, user));
   }
 
   /**
    * Synchronously renew delegation tokens.
+   * @param user user
    */
   public void addApplicationSync(ApplicationId applicationId, Credentials ts,
-      boolean shouldCancelAtEnd) throws IOException{
+      boolean shouldCancelAtEnd, String user) throws IOException,
+      InterruptedException {
     handleAppSubmitEvent(new DelegationTokenRenewerAppSubmitEvent(
-      applicationId, ts, shouldCancelAtEnd));
+      applicationId, ts, shouldCancelAtEnd, user));
   }
 
   private void handleAppSubmitEvent(DelegationTokenRenewerAppSubmitEvent evt)
-      throws IOException {
+      throws IOException, InterruptedException {
     ApplicationId applicationId = evt.getApplicationId();
     Credentials ts = evt.getCredentials();
     boolean shouldCancelAtEnd = evt.shouldCancelAtEnd();
@@ -375,14 +401,21 @@ public class DelegationTokenRenewer extends AbstractService {
     // all renewable tokens are valid
     // At RM restart it is safe to assume that all the previously added tokens
     // are valid
-    List<DelegationTokenToRenew> tokenList =
-        new ArrayList<DelegationTokenRenewer.DelegationTokenToRenew>();
+    appTokens.put(applicationId,
+      Collections.synchronizedSet(new HashSet<DelegationTokenToRenew>()));
+    Set<DelegationTokenToRenew> tokenList = new HashSet<DelegationTokenToRenew>();
+    boolean hasHdfsToken = false;
     for (Token<?> token : tokens) {
       if (token.isManaged()) {
         tokenList.add(new DelegationTokenToRenew(applicationId,
-            token, getConfig(), now, shouldCancelAtEnd));
+            token, getConfig(), now, shouldCancelAtEnd, evt.getUser()));
+        if (token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
+          LOG.info(applicationId + " found existing hdfs token " + token);
+          hasHdfsToken = true;
+        }
       }
     }
+
     if (!tokenList.isEmpty()) {
       // Renewing token and adding it to timer calls are separated purposefully
       // If user provides incorrect token then it should not be added for
@@ -395,14 +428,15 @@ public class DelegationTokenRenewer extends AbstractService {
         }
       }
       for (DelegationTokenToRenew dtr : tokenList) {
-        addTokenToList(dtr);
+        appTokens.get(applicationId).add(dtr);
         setTimerForTokenRenewal(dtr);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Registering token for renewal for:" + " service = "
-              + dtr.token.getService() + " for appId = " + dtr.applicationId);
-        }
       }
     }
+
+    if (!hasHdfsToken) {
+      requestNewHdfsDelegationToken(applicationId, evt.getUser(),
+        shouldCancelAtEnd);
+    }
   }
 
   /**
@@ -424,14 +458,16 @@ public class DelegationTokenRenewer extends AbstractService {
       }
 
       Token<?> token = dttr.token;
+
       try {
-        renewToken(dttr);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Renewing delegation-token for:" + token.getService() + 
-              "; new expiration;" + dttr.expirationDate);
+        requestNewHdfsDelegationTokenIfNeeded(dttr);
+        // if the token is not replaced by a new token, renew the token
+        if (appTokens.get(dttr.applicationId).contains(dttr)) {
+          renewToken(dttr);
+          setTimerForTokenRenewal(dttr);// set the next one
+        } else {
+          LOG.info("The token was removed already. Token = [" +dttr +"]");
         }
-        
-        setTimerForTokenRenewal(dttr);// set the next one
       } catch (Exception e) {
         LOG.error("Exception renewing token" + token + ". Not rescheduled", e);
         removeFailedDelegationToken(dttr);
@@ -455,12 +491,14 @@ public class DelegationTokenRenewer extends AbstractService {
     // calculate timer time
     long expiresIn = token.expirationDate - System.currentTimeMillis();
     long renewIn = token.expirationDate - expiresIn/10; // little bit before the expiration
-    
     // need to create new task every time
     TimerTask tTask = new RenewalTimerTask(token);
     token.setTimerTask(tTask); // keep reference to the timer
 
     renewalTimer.schedule(token.timerTask, new Date(renewIn));
+
+    LOG.info("Renew " + token + " in " + expiresIn + " ms, appId = "
+        + token.applicationId);
   }
 
   // renew a token
@@ -470,16 +508,99 @@ public class DelegationTokenRenewer extends AbstractService {
     // need to use doAs so that http can find the kerberos tgt
     // NOTE: token renewers should be responsible for the correct UGI!
     try {
-      dttr.expirationDate = UserGroupInformation.getLoginUser().doAs(
-          new PrivilegedExceptionAction<Long>(){          
-            @Override
-            public Long run() throws Exception {
-              return dttr.token.renew(dttr.conf);
-            }
-          });
+      dttr.expirationDate =
+          UserGroupInformation.getLoginUser().doAs(
+            new PrivilegedExceptionAction<Long>() {
+              @Override
+              public Long run() throws Exception {
+                return dttr.token.renew(dttr.conf);
+              }
+            });
     } catch (InterruptedException e) {
       throw new IOException(e);
     }
+    LOG.info("Renewed delegation-token= [" + dttr + "], for "
+        + dttr.applicationId);
+  }
+
+  // Request new hdfs token if the token is about to expire, and remove the old
+  // token from the tokenToRenew list
+  private void requestNewHdfsDelegationTokenIfNeeded(
+      final DelegationTokenToRenew dttr) throws IOException,
+      InterruptedException {
+
+    if (hasProxyUserPrivileges
+        && dttr.maxDate - dttr.expirationDate < credentialsValidTimeRemaining
+        && dttr.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
+
+      // remove all old expiring hdfs tokens for this application.
+      Set<DelegationTokenToRenew> tokenSet = appTokens.get(dttr.applicationId);
+      if (tokenSet != null && !tokenSet.isEmpty()) {
+        Iterator<DelegationTokenToRenew> iter = tokenSet.iterator();
+        synchronized (tokenSet) {
+          while (iter.hasNext()) {
+            DelegationTokenToRenew t = iter.next();
+            if (t.token.getKind().equals(new Text("HDFS_DELEGATION_TOKEN"))) {
+              iter.remove();
+              if (t.timerTask != null) {
+                t.timerTask.cancel();
+              }
+              LOG.info("Removed expiring token " + t);
+            }
+          }
+        }
+      }
+      LOG.info("Token= (" + dttr + ") is expiring, request new token.");
+      requestNewHdfsDelegationToken(dttr.applicationId, dttr.user,
+        dttr.shouldCancelAtEnd);
+    }
+  }
+
+  private void requestNewHdfsDelegationToken(ApplicationId applicationId,
+      String user, boolean shouldCancelAtEnd) throws IOException,
+      InterruptedException {
+    // Get new hdfs tokens for this user
+    Credentials credentials = new Credentials();
+    Token<?>[] newTokens = obtainSystemTokensForUser(user, credentials);
+
+    // Add new tokens to the toRenew list.
+    LOG.info("Received new tokens for " + applicationId + ". Received "
+        + newTokens.length + " tokens.");
+    if (newTokens.length > 0) {
+      for (Token<?> token : newTokens) {
+        if (token.isManaged()) {
+          DelegationTokenToRenew tokenToRenew =
+              new DelegationTokenToRenew(applicationId, token, getConfig(),
+                Time.now(), shouldCancelAtEnd, user);
+          // renew the token to get the next expiration date.
+          renewToken(tokenToRenew);
+          setTimerForTokenRenewal(tokenToRenew);
+          appTokens.get(applicationId).add(tokenToRenew);
+          LOG.info("Received new token " + token);
+        }
+      }
+    }
+    DataOutputBuffer dob = new DataOutputBuffer();
+    credentials.writeTokenStorageToStream(dob);
+    ByteBuffer byteBuffer = ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
+    rmContext.getSystemCredentialsForApps().put(applicationId, byteBuffer);
+  }
+
+  protected Token<?>[] obtainSystemTokensForUser(String user,
+      final Credentials credentials) throws IOException, InterruptedException {
+    // Get new hdfs tokens on behalf of this user
+    UserGroupInformation proxyUser =
+        UserGroupInformation.createProxyUser(user,
+          UserGroupInformation.getLoginUser());
+    Token<?>[] newTokens =
+        proxyUser.doAs(new PrivilegedExceptionAction<Token<?>[]>() {
+          @Override
+          public Token<?>[] run() throws Exception {
+            return FileSystem.get(getConfig()).addDelegationTokens(
+              UserGroupInformation.getLoginUser().getUserName(), credentials);
+          }
+        });
+    return newTokens;
   }
 
   // cancel a token
@@ -497,13 +618,13 @@ public class DelegationTokenRenewer extends AbstractService {
    */
   private void removeFailedDelegationToken(DelegationTokenToRenew t) {
     ApplicationId applicationId = t.applicationId;
-    if (LOG.isDebugEnabled())
-      LOG.debug("removing failed delegation token for appid=" + applicationId + 
-          ";t=" + t.token.getService());
-    delegationTokens.remove(t);
+    LOG.error("removing failed delegation token for appid=" + applicationId
+        + ";t=" + t.token.getService());
+    appTokens.get(applicationId).remove(t);
     // cancel the timer
-    if(t.timerTask!=null)
+    if (t.timerTask != null) {
       t.timerTask.cancel();
+    }
   }
 
   /**
@@ -543,18 +664,21 @@ public class DelegationTokenRenewer extends AbstractService {
   }
 
   private void removeApplicationFromRenewal(ApplicationId applicationId) {
-    synchronized (delegationTokens) {
-      Iterator<DelegationTokenToRenew> it = delegationTokens.iterator();
-      while(it.hasNext()) {
-        DelegationTokenToRenew dttr = it.next();
-        if (dttr.applicationId.equals(applicationId)) {
+    rmContext.getSystemCredentialsForApps().remove(applicationId);
+    Set<DelegationTokenToRenew> tokens = appTokens.get(applicationId);
+
+    if (tokens != null && !tokens.isEmpty()) {
+      synchronized (tokens) {
+        Iterator<DelegationTokenToRenew> it = tokens.iterator();
+        while (it.hasNext()) {
+          DelegationTokenToRenew dttr = it.next();
           if (LOG.isDebugEnabled()) {
-            LOG.debug("Removing delegation token for appId=" + applicationId + 
-                "; token=" + dttr.token.getService());
+            LOG.debug("Removing delegation token for appId=" + applicationId
+                + "; token=" + dttr.token.getService());
           }
 
           // cancel the timer
-          if(dttr.timerTask!=null)
+          if (dttr.timerTask != null)
             dttr.timerTask.cancel();
 
           // cancel the token
@@ -670,17 +794,19 @@ public class DelegationTokenRenewer extends AbstractService {
     }
   }
   
-  private static class DelegationTokenRenewerAppSubmitEvent extends
+  static class DelegationTokenRenewerAppSubmitEvent extends
       DelegationTokenRenewerEvent {
 
     private Credentials credentials;
     private boolean shouldCancelAtEnd;
+    private String user;
 
     public DelegationTokenRenewerAppSubmitEvent(ApplicationId appId,
-        Credentials credentails, boolean shouldCancelAtEnd) {
+        Credentials credentails, boolean shouldCancelAtEnd, String user) {
       super(appId, DelegationTokenRenewerEventType.VERIFY_AND_START_APPLICATION);
       this.credentials = credentails;
       this.shouldCancelAtEnd = shouldCancelAtEnd;
+      this.user = user;
     }
 
     public Credentials getCredentials() {
@@ -690,6 +816,10 @@ public class DelegationTokenRenewer extends AbstractService {
     public boolean shouldCancelAtEnd() {
       return shouldCancelAtEnd;
     }
+
+    public String getUser() {
+      return user;
+    }
   }
   
   enum DelegationTokenRenewerEventType {