You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/16 05:12:32 UTC

svn commit: r1493450 - in /hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protoco...

Author: vinodkv
Date: Sun Jun 16 03:12:31 2013
New Revision: 1493450

URL: http://svn.apache.org/r1493450
Log:
YARN-693. Modified RM to send NMTokens on allocate call so that AMs can then use them for authentication with NMs. Contributed by Omkar Vinit Joshi.
svn merge --ignore-ancestry -c 1493448 ../../trunk/

Added:
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java
      - copied unchanged from r1493448, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NMToken.java
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java
      - copied unchanged from r1493448, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/NMTokenPBImpl.java
Modified:
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
    hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/CHANGES.txt Sun Jun 16 03:12:31 2013
@@ -314,6 +314,9 @@ Release 2.1.0-beta - UNRELEASED
     YARN-639. Modified Distributed Shell application to start using the new
     NMClient library. (Zhijie Shen via vinodkv)
 
+    YARN-693. Modified RM to send NMTokens on allocate call so that AMs can then
+    use them for authentication with NMs. (Omkar Vinit Joshi via vinodkv)
+
   OPTIMIZATIONS
 
     YARN-512. Log aggregation root directory check is more expensive than it

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/AllocateResponse.java Sun Jun 16 03:12:31 2013
@@ -29,10 +29,10 @@ import org.apache.hadoop.yarn.api.AMRMPr
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.util.Records;
 
 /**
@@ -67,7 +67,7 @@ public abstract class AllocateResponse {
       List<ContainerStatus> completedContainers,
       List<Container> allocatedContainers, List<NodeReport> updatedNodes,
       Resource availResources, AMCommand command, int numClusterNodes,
-      PreemptionMessage preempt) {
+      PreemptionMessage preempt, List<NMToken> nmTokens) {
     AllocateResponse response = Records.newRecord(AllocateResponse.class);
     response.setNumClusterNodes(numClusterNodes);
     response.setResponseId(responseId);
@@ -77,6 +77,7 @@ public abstract class AllocateResponse {
     response.setAvailableResources(availResources);
     response.setAMCommand(command);
     response.setPreemptionMessage(preempt);
+    response.setNMTokens(nmTokens);
     return response;
   }
 
@@ -202,7 +203,7 @@ public abstract class AllocateResponse {
   
   @Public
   @Stable
-  public abstract void setNMTokens(List<Token> nmTokens);
+  public abstract void setNMTokens(List<NMToken> nmTokens);
   
   /**
    * Get the list of NMTokens required for communicating with NM. New NMTokens
@@ -217,6 +218,6 @@ public abstract class AllocateResponse {
    */
   @Public
   @Stable
-  public abstract List<Token> getNMTokens();
+  public abstract List<NMToken> getNMTokens();
 
 }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/protocolrecords/impl/pb/AllocateResponsePBImpl.java Sun Jun 16 03:12:31 2013
@@ -23,21 +23,20 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.PreemptionMessage;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeReportPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.PreemptionMessagePBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
-import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
@@ -46,6 +45,7 @@ import org.apache.hadoop.yarn.proto.Yarn
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProto;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.AllocateResponseProtoOrBuilder;
 import org.apache.hadoop.yarn.util.ProtoUtils;
+import org.apache.hadoop.yarn.proto.YarnServiceProtos.NMTokenProto;
 
     
 public class AllocateResponsePBImpl extends AllocateResponse {
@@ -56,7 +56,7 @@ public class AllocateResponsePBImpl exte
   Resource limit;
 
   private List<Container> allocatedContainers = null;
-  private List<Token> nmTokens = null;
+  private List<NMToken> nmTokens = null;
   private List<ContainerStatus> completedContainersStatuses = null;
 
   private List<NodeReport> updatedNodes = null;
@@ -108,7 +108,7 @@ public class AllocateResponsePBImpl exte
     }
     if (nmTokens != null) {
       builder.clearNmTokens();
-      Iterable<TokenProto> iterable = getTokenProtoIterable(nmTokens);
+      Iterable<NMTokenProto> iterable = getTokenProtoIterable(nmTokens);
       builder.addAllNmTokens(iterable);
     }
     if (this.completedContainersStatuses != null) {
@@ -248,9 +248,11 @@ public class AllocateResponsePBImpl exte
   }
 
   @Override
-  public synchronized void setNMTokens(List<Token> nmTokens) {
+  public synchronized void setNMTokens(List<NMToken> nmTokens) {
     if (nmTokens == null || nmTokens.isEmpty()) {
-      this.nmTokens.clear();
+      if (this.nmTokens != null) {
+        this.nmTokens.clear();
+      }
       builder.clearNmTokens();
       return;
     }
@@ -260,7 +262,7 @@ public class AllocateResponsePBImpl exte
   }
 
   @Override
-  public synchronized List<Token> getNMTokens() {
+  public synchronized List<NMToken> getNMTokens() {
     initLocalNewNMTokenList();
     return nmTokens;
   }
@@ -334,9 +336,9 @@ public class AllocateResponsePBImpl exte
       return;
     }
     AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
-    List<TokenProto> list = p.getNmTokensList();
-    nmTokens = new ArrayList<Token>();
-    for (TokenProto t : list) {
+    List<NMTokenProto> list = p.getNmTokensList();
+    nmTokens = new ArrayList<NMToken>();
+    for (NMTokenProto t : list) {
       nmTokens.add(convertFromProtoFormat(t));
     }
   }
@@ -372,15 +374,15 @@ public class AllocateResponsePBImpl exte
     };
   }
 
-  private synchronized Iterable<TokenProto> getTokenProtoIterable(
-      final List<Token> nmTokenList) {
+  private synchronized Iterable<NMTokenProto> getTokenProtoIterable(
+      final List<NMToken> nmTokenList) {
     maybeInitBuilder();
-    return new Iterable<TokenProto>() {
+    return new Iterable<NMTokenProto>() {
       @Override
-      public synchronized Iterator<TokenProto> iterator() {
-        return new Iterator<TokenProto>() {
+      public synchronized Iterator<NMTokenProto> iterator() {
+        return new Iterator<NMTokenProto>() {
 
-          Iterator<Token> iter = nmTokenList.iterator();
+          Iterator<NMToken> iter = nmTokenList.iterator();
           
           @Override
           public boolean hasNext() {
@@ -388,7 +390,7 @@ public class AllocateResponsePBImpl exte
           }
           
           @Override
-          public TokenProto next() {
+          public NMTokenProto next() {
             return convertToProtoFormat(iter.next());
           }
           
@@ -524,11 +526,11 @@ public class AllocateResponsePBImpl exte
     return ((PreemptionMessagePBImpl)r).getProto();
   }
   
-  private synchronized TokenProto convertToProtoFormat(Token token) {
-    return ((TokenPBImpl)token).getProto();
+  private synchronized NMTokenProto convertToProtoFormat(NMToken token) {
+    return ((NMTokenPBImpl)token).getProto();
   }
   
-  private synchronized Token convertFromProtoFormat(TokenProto proto) {
-    return new TokenPBImpl(proto);
+  private synchronized NMToken convertFromProtoFormat(NMTokenProto proto) {
+    return new NMTokenPBImpl(proto);
   }
 }  

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_service_protos.proto Sun Jun 16 03:12:31 2013
@@ -59,6 +59,11 @@ message AllocateRequestProto {
   optional float progress = 6;
 }
 
+message NMTokenProto {
+  optional NodeIdProto nodeId = 1; 
+  optional hadoop.common.TokenProto token = 2;
+}
+
 message AllocateResponseProto {
   optional AMCommandProto a_m_command = 1;
   optional int32 response_id = 2;
@@ -68,7 +73,7 @@ message AllocateResponseProto {
   repeated NodeReportProto updated_nodes = 6;
   optional int32 num_cluster_nodes = 7;
   optional PreemptionMessageProto preempt = 8;
-  repeated hadoop.common.TokenProto nm_tokens = 9;
+  repeated NMTokenProto nm_tokens = 9;
 }
 
 //////////////////////////////////////////////////////

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClient.java Sun Jun 16 03:12:31 2013
@@ -21,15 +21,17 @@ package org.apache.hadoop.yarn.client;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.service.Service;
 
@@ -208,4 +210,13 @@ public interface AMRMClient<T extends AM
                                            String resourceName, 
                                            Resource capability);
 
+  /**
+   * It returns the NMToken received on allocate call. It will not communicate
+   * with RM to get NMTokens. On allocate call whenever we receive new token
+   * along with container AMRMClient will cache this NMToken per node manager.
+   * This map returned should be shared with any application which is
+   * communicating with NodeManager (ex. NMClient) using NMTokens. If a new
+   * NMToken is received for the same node manager then it will be replaced. 
+   */
+  public ConcurrentMap<String, Token> getNMTokens();
 }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientAsync.java Sun Jun 16 03:12:31 2013
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -260,6 +262,19 @@ public class AMRMClientAsync<T extends C
   public int getClusterNodeCount() {
     return client.getClusterNodeCount();
   }
+
+  /**
+   * It returns the NMToken received on allocate call. It will not communicate
+   * with RM to get NMTokens. On allocate call whenever we receive new token
+   * along with new container AMRMClientAsync will cache this NMToken per node
+   * manager. This map returned should be shared with any application which is
+   * communicating with NodeManager (ex. NMClient / NMClientAsync) using
+   * NMTokens. If a new NMToken is received for the same node manager
+   * then it will be replaced. 
+   */
+  public ConcurrentMap<String, Token> getNMTokens() {
+    return client.getNMTokens();
+  }
   
   private class HeartbeatThread extends Thread {
     public HeartbeatThread() {

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/AMRMClientImpl.java Sun Jun 16 03:12:31 2013
@@ -33,9 +33,11 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RPC;
@@ -49,9 +51,11 @@ import org.apache.hadoop.yarn.api.protoc
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -61,6 +65,8 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.ipc.YarnRPC;
 import org.apache.hadoop.yarn.service.AbstractService;
 
+import com.google.common.annotations.VisibleForTesting;
+
 // TODO check inputs for null etc. YARN-654
 
 @Unstable
@@ -73,6 +79,7 @@ public class AMRMClientImpl<T extends Co
       RecordFactoryProvider.getRecordFactory(null);
   
   private int lastResponseId = 0;
+  private ConcurrentHashMap<String, Token> nmTokens;
 
   protected AMRMProtocol rmClient;
   protected final ApplicationAttemptId appAttemptId;  
@@ -148,6 +155,7 @@ public class AMRMClientImpl<T extends Co
   public AMRMClientImpl(ApplicationAttemptId appAttemptId) {
     super(AMRMClientImpl.class.getName());
     this.appAttemptId = appAttemptId;
+    this.nmTokens = new ConcurrentHashMap<String, Token>();
   }
 
   @Override
@@ -238,6 +246,9 @@ public class AMRMClientImpl<T extends Co
         clusterNodeCount = allocateResponse.getNumClusterNodes();
         lastResponseId = allocateResponse.getResponseId();
         clusterAvailableResources = allocateResponse.getAvailableResources();
+        if (!allocateResponse.getNMTokens().isEmpty()) {
+          populateNMTokens(allocateResponse);
+        }
       }
     } finally {
       // TODO how to differentiate remote yarn exception vs error in rpc
@@ -265,6 +276,20 @@ public class AMRMClientImpl<T extends Co
     return allocateResponse;
   }
 
+  @Private
+  @VisibleForTesting
+  protected void populateNMTokens(AllocateResponse allocateResponse) {
+    for (NMToken token : allocateResponse.getNMTokens()) {
+      String nodeId = token.getNodeId().toString();
+      if (nmTokens.containsKey(nodeId)) {
+        LOG.debug("Replacing token for : " + nodeId);
+      } else {
+        LOG.debug("Received new token for : " + nodeId);
+      }
+      nmTokens.put(nodeId, token.getToken());
+    }
+  }
+
   @Override
   public void unregisterApplicationMaster(FinalApplicationStatus appStatus,
       String appMessage, String appTrackingUrl) throws YarnException,
@@ -512,4 +537,8 @@ public class AMRMClientImpl<T extends Co
     }
   }
 
+  @Override
+  public ConcurrentHashMap<String, Token> getNMTokens() {
+    return nmTokens;
+  }
 }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClient.java Sun Jun 16 03:12:31 2013
@@ -25,9 +25,14 @@ import static org.mockito.Mockito.when;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+
+import junit.framework.Assert;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.AMRMProtocol;
@@ -49,6 +54,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.AMRMClient.StoredContainerRequest;
@@ -437,6 +443,11 @@ public class TestAMRMClient {
     int allocatedContainerCount = 0;
     int iterationsLeft = 2;
     Set<ContainerId> releases = new TreeSet<ContainerId>();
+    
+    ConcurrentHashMap<String, Token> nmTokens = amClient.getNMTokens();
+    Assert.assertEquals(0, nmTokens.size());
+    HashMap<String, Token> receivedNMTokens = new HashMap<String, Token>();
+    
     while (allocatedContainerCount < containersRequestedAny
         && iterationsLeft-- > 0) {
       AllocateResponse allocResponse = amClient.allocate(0.1f);
@@ -450,12 +461,32 @@ public class TestAMRMClient {
         releases.add(rejectContainerId);
         amClient.releaseAssignedContainer(rejectContainerId);
       }
+      Assert.assertEquals(nmTokens.size(), amClient.getNMTokens().size());
+      Iterator<String> nodeI = nmTokens.keySet().iterator();
+      while (nodeI.hasNext()) {
+        String nodeId = nodeI.next();
+        if (!receivedNMTokens.containsKey(nodeId)) {
+          receivedNMTokens.put(nodeId, nmTokens.get(nodeId));
+        } else {
+          Assert.fail("Received token again for : " + nodeId);
+        }
+      }
+      nodeI = receivedNMTokens.keySet().iterator();
+      while (nodeI.hasNext()) {
+        nmTokens.remove(nodeI.next());
+      }
+      
       if(allocatedContainerCount < containersRequestedAny) {
         // sleep to let NM's heartbeat to RM and trigger allocations
         sleep(1000);
       }
     }
-
+    
+    Assert.assertEquals(0, amClient.getNMTokens().size());
+    // Should receive atleast 1 token
+    Assert.assertTrue(receivedNMTokens.size() > 0
+        && receivedNMTokens.size() <= nodeCount);
+    
     assertTrue(allocatedContainerCount == containersRequestedAny);
     assertTrue(amClient.release.size() == 2);
     assertTrue(amClient.ask.size() == 0);
@@ -523,7 +554,6 @@ public class TestAMRMClient {
         sleep(1000);
       }
     }
-    
     assertTrue(amClient.ask.size() == 0);
     assertTrue(amClient.release.size() == 0);
   }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestAMRMClientAsync.java Sun Jun 16 03:12:31 2013
@@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.client.AMRMClient.ContainerRequest;
@@ -66,11 +67,11 @@ public class TestAMRMClientAsync {
     List<Container> allocated1 = Arrays.asList(
         Container.newInstance(null, null, null, null, null, null));
     final AllocateResponse response1 = createAllocateResponse(
-        new ArrayList<ContainerStatus>(), allocated1);
+        new ArrayList<ContainerStatus>(), allocated1, null);
     final AllocateResponse response2 = createAllocateResponse(completed1,
-        new ArrayList<Container>());
+        new ArrayList<Container>(), null);
     final AllocateResponse emptyResponse = createAllocateResponse(
-        new ArrayList<ContainerStatus>(), new ArrayList<Container>());
+        new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
 
     TestCallbackHandler callbackHandler = new TestCallbackHandler();
     final AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
@@ -146,7 +147,7 @@ public class TestAMRMClientAsync {
     Assert.assertEquals(null, callbackHandler.takeAllocatedContainers());
     Assert.assertEquals(null, callbackHandler.takeCompletedContainers());
   }
-  
+
   @Test(timeout=10000)
   public void testAMRMClientAsyncException() throws Exception {
     Configuration conf = new Configuration();
@@ -189,7 +190,7 @@ public class TestAMRMClientAsync {
     AMRMClient<ContainerRequest> client = mock(AMRMClientImpl.class);
     
     final AllocateResponse rebootResponse = createAllocateResponse(
-        new ArrayList<ContainerStatus>(), new ArrayList<Container>());
+        new ArrayList<ContainerStatus>(), new ArrayList<Container>(), null);
     rebootResponse.setAMCommand(AMCommand.AM_RESYNC);
     when(client.allocate(anyFloat())).thenReturn(rebootResponse);
     
@@ -215,9 +216,11 @@ public class TestAMRMClientAsync {
   }
   
   private AllocateResponse createAllocateResponse(
-      List<ContainerStatus> completed, List<Container> allocated) {
-    AllocateResponse response = AllocateResponse.newInstance(0, completed, allocated,
-        new ArrayList<NodeReport>(), null, null, 1, null);
+      List<ContainerStatus> completed, List<Container> allocated,
+      List<NMToken> nmTokens) {
+    AllocateResponse response =
+        AllocateResponse.newInstance(0, completed, allocated,
+            new ArrayList<NodeReport>(), null, null, 1, null, nmTokens);
     return response;
   }
 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java Sun Jun 16 03:12:31 2013
@@ -21,15 +21,19 @@ package org.apache.hadoop.yarn.security;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
 
 
 public class NMTokenIdentifier extends TokenIdentifier {
@@ -106,5 +110,4 @@ public class NMTokenIdentifier extends T
   public UserGroupInformation getUser() {
     return UserGroupInformation.createRemoteUser(appAttemptId.toString());
   }
-
 }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseContainerTokenSecretManager.java Sun Jun 16 03:12:31 2013
@@ -18,14 +18,11 @@
 
 package org.apache.hadoop.yarn.server.security;
 
-import java.nio.ByteBuffer;
 import java.security.SecureRandom;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import javax.crypto.SecretKey;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -34,7 +31,6 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
-import org.apache.hadoop.yarn.util.Records;
 
 /**
  * SecretManager for ContainerTokens. Extended by both RM and NM and hence is

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java Sun Jun 16 03:12:31 2013
@@ -114,39 +114,4 @@ public class BaseNMTokenSecretManager ex
   public NMTokenIdentifier createIdentifier() {
     return new NMTokenIdentifier();
   }
-
-  /**
-   * Helper function for creating NMTokens.
-   */
-  public Token createNMToken(ApplicationAttemptId applicationAttemptId,
-      NodeId nodeId, String applicationSubmitter) {
-    byte[] password;
-    NMTokenIdentifier identifier;
-    
-    this.readLock.lock();
-    try {
-      identifier =
-          new NMTokenIdentifier(applicationAttemptId, nodeId,
-              applicationSubmitter, this.currentMasterKey.getMasterKey()
-                  .getKeyId());
-      password = this.createPassword(identifier);
-    } finally {
-      this.readLock.unlock();
-    }
-    return newNMToken(password, identifier);
-  }
-  
-  public static Token newNMToken(byte[] password,
-      NMTokenIdentifier identifier) {
-    NodeId nodeId = identifier.getNodeId();
-    // RPC layer client expects ip:port as service for tokens
-    InetSocketAddress addr =
-        NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
-    Token nmToken =
-        Token.newInstance(identifier.getBytes(),
-          NMTokenIdentifier.KIND.toString(), password, SecurityUtil
-            .buildTokenService(addr).toString());
-    return nmToken;
-
-  }
 }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java Sun Jun 16 03:12:31 2013
@@ -373,6 +373,12 @@ public class ApplicationMasterService ex
       // add preemption to the allocateResponse message (if any)
       allocateResponse.setPreemptionMessage(generatePreemptionMessage(allocation));
       
+      // Adding NMTokens for allocated containers.
+      if (!allocation.getContainers().isEmpty()) {
+        allocateResponse.setNMTokens(rmContext.getNMTokenSecretManager()
+            .getNMTokens(app.getUser(), appAttemptId,
+                allocation.getContainers()));
+      }
       return allocateResponse;
     }
   }
@@ -433,12 +439,15 @@ public class ApplicationMasterService ex
     AllocateResponse response =
         recordFactory.newRecordInstance(AllocateResponse.class);
     response.setResponseId(0);
-    LOG.info("Registering " + attemptId);
+    LOG.info("Registering app attempt : " + attemptId);
     responseMap.put(attemptId, response);
+    rmContext.getNMTokenSecretManager().registerApplicationAttempt(attemptId);
   }
 
   public void unregisterAttempt(ApplicationAttemptId attemptId) {
+    LOG.info("Unregistering app attempt : " + attemptId);
     responseMap.remove(attemptId);
+    rmContext.getNMTokenSecretManager().unregisterApplicationAttempt(attemptId);
   }
 
   public void refreshServiceAcls(Configuration configuration, 

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java Sun Jun 16 03:12:31 2013
@@ -218,7 +218,9 @@ public class ResourceTrackerService exte
       this.rmContext.getDispatcher().getEventHandler().handle(
           new RMNodeReconnectEvent(nodeId, rmNode));
     }
-
+    // On every node manager register we will be clearing NMToken keys if
+    // present for any running application.
+    this.nmTokenSecretManager.removeNodeKey(nodeId);
     this.nmLivelinessMonitor.register(nodeId);
 
     String message =

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java Sun Jun 16 03:12:31 2013
@@ -18,18 +18,35 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.security;
 
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.security.BaseNMTokenSecretManager;
 import org.apache.hadoop.yarn.server.security.MasterKeyData;
 
+import com.google.common.annotations.VisibleForTesting;
+
 
 public class NMTokenSecretManagerInRM extends BaseNMTokenSecretManager {
 
@@ -42,6 +59,7 @@ public class NMTokenSecretManagerInRM ex
   private final Timer timer;
   private final long rollingInterval;
   private final long activationDelay;
+  private final ConcurrentHashMap<ApplicationAttemptId, HashSet<NodeId>> appAttemptToNodeKeyMap;
   
   public NMTokenSecretManagerInRM(Configuration conf) {
     this.conf = conf;
@@ -70,6 +88,8 @@ public class NMTokenSecretManagerInRM ex
               + " should be more than 2 X "
               + YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS);
     }
+    appAttemptToNodeKeyMap =
+        new ConcurrentHashMap<ApplicationAttemptId, HashSet<NodeId>>();
   }
   
   /**
@@ -119,11 +139,23 @@ public class NMTokenSecretManagerInRM ex
           + this.nextMasterKey.getMasterKey().getKeyId());
       this.currentMasterKey = this.nextMasterKey;
       this.nextMasterKey = null;
+      clearApplicationNMTokenKeys();
     } finally {
       super.writeLock.unlock();
     }
   }
 
+  private void clearApplicationNMTokenKeys() {
+    // We should clear all node entries from this set.
+    // TODO : Once we have per node master key then it will change to only
+    // remove specific node from it.
+    Iterator<HashSet<NodeId>> nodeSetI =
+        this.appAttemptToNodeKeyMap.values().iterator();
+    while (nodeSetI.hasNext()) {
+      nodeSetI.next().clear();
+    }
+  }
+
   public void start() {
     rollMasterKey();
     this.timer.scheduleAtFixedRate(new MasterKeyRoller(), rollingInterval,
@@ -150,4 +182,129 @@ public class NMTokenSecretManagerInRM ex
       activateNextMasterKey();
     }
   }
+  
+  public List<NMToken> getNMTokens(String applicationSubmitter,
+      ApplicationAttemptId appAttemptId, List<Container> containers) {
+    try {
+      this.readLock.lock();
+      List<NMToken> nmTokens = new ArrayList<NMToken>();
+      HashSet<NodeId> nodeSet = this.appAttemptToNodeKeyMap.get(appAttemptId);
+      if (nodeSet != null) {
+        for (Container container : containers) {
+          if (!nodeSet.contains(container.getNodeId())) {
+            LOG.debug("Sending NMToken for nodeId : "
+                + container.getNodeId().toString());
+            Token token = createNMToken(appAttemptId, container.getNodeId(),
+                applicationSubmitter);
+            NMToken nmToken =
+                NMToken.newInstance(container.getNodeId(), token);
+            nmTokens.add(nmToken);
+            nodeSet.add(container.getNodeId());
+          }
+        }
+      }
+      return nmTokens;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+  
+  public void registerApplicationAttempt(ApplicationAttemptId appAttemptId) {
+    try {
+      this.writeLock.lock();
+      this.appAttemptToNodeKeyMap.put(appAttemptId, new HashSet<NodeId>());
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+  
+  @Private
+  @VisibleForTesting
+  public boolean isApplicationAttemptRegistered(
+      ApplicationAttemptId appAttemptId) {
+    try {
+      this.readLock.lock();
+      return this.appAttemptToNodeKeyMap.containsKey(appAttemptId);
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+  
+  @Private
+  @VisibleForTesting
+  public boolean isApplicationAttemptNMTokenPresent(
+      ApplicationAttemptId appAttemptId, NodeId nodeId) {
+    try {
+      this.readLock.lock();
+      HashSet<NodeId> nodes = this.appAttemptToNodeKeyMap.get(appAttemptId);
+      if (nodes != null && nodes.contains(nodeId)) {
+        return true;
+      } else {
+        return false;
+      }
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+  
+  public void unregisterApplicationAttempt(ApplicationAttemptId appAttemptId) {
+    try {
+      this.writeLock.lock();
+      this.appAttemptToNodeKeyMap.remove(appAttemptId);
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+  
+  /**
+   * This is to be called when NodeManager reconnects or goes down. This will
+   * remove if NMTokens if present for any running application from cache.
+   * @param nodeId
+   */
+  public void removeNodeKey(NodeId nodeId) {
+    try {
+      this.writeLock.lock();
+      Iterator<HashSet<NodeId>> appNodeKeySetIterator =
+          this.appAttemptToNodeKeyMap.values().iterator();
+      while (appNodeKeySetIterator.hasNext()) {
+        appNodeKeySetIterator.next().remove(nodeId);
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+  
+  public static Token newNMToken(byte[] password,
+      NMTokenIdentifier identifier) {
+    NodeId nodeId = identifier.getNodeId();
+    // RPC layer client expects ip:port as service for tokens
+    InetSocketAddress addr =
+        NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
+    Token nmToken =
+        Token.newInstance(identifier.getBytes(),
+          NMTokenIdentifier.KIND.toString(), password, SecurityUtil
+            .buildTokenService(addr).toString());
+    return nmToken;
+  }
+  
+  /**
+   * Helper function for creating NMTokens.
+   */
+  public Token createNMToken(ApplicationAttemptId applicationAttemptId,
+      NodeId nodeId, String applicationSubmitter) {
+    byte[] password;
+    NMTokenIdentifier identifier;
+    
+    this.readLock.lock();
+    try {
+      identifier =
+          new NMTokenIdentifier(applicationAttemptId, nodeId,
+              applicationSubmitter, this.currentMasterKey.getMasterKey()
+                  .getKeyId());
+      password = this.createPassword(identifier);
+    } finally {
+      this.readLock.unlock();
+    }
+    return newNMToken(password, identifier);
+  }
 }

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java Sun Jun 16 03:12:31 2013
@@ -65,12 +65,12 @@ public class MockAM {
     RMAppAttempt attempt = app.getRMAppAttempt(attemptId);
     int timeoutSecs = 0;
     while (!finalState.equals(attempt.getAppAttemptState())
-        && timeoutSecs++ < 20) {
+        && timeoutSecs++ < 40) {
       System.out
           .println("AppAttempt : " + attemptId + " State is : " 
               + attempt.getAppAttemptState()
               + " Waiting for state : " + finalState);
-      Thread.sleep(500);
+      Thread.sleep(1000);
     }
     System.out.println("AppAttempt State is : " + attempt.getAppAttemptState());
     Assert.assertEquals("AppAttempt state is not correct (timedout)",

Modified: hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java?rev=1493450&r1=1493449&r2=1493450&view=diff
==============================================================================
--- hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java (original)
+++ hadoop/common/branches/branch-2.1-beta/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestRM.java Sun Jun 16 03:12:31 2013
@@ -19,22 +19,27 @@
 package org.apache.hadoop.yarn.server.resourcemanager;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 
 import junit.framework.Assert;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
+import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM;
 import org.apache.log4j.Level;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -136,6 +141,184 @@ public class TestRM {
 
     rm.stop();
   }
+  
+  @Test
+  public void testNMToken() throws Exception {
+    MockRM rm = new MockRM();
+    try {
+      rm.start();
+      MockNM nm1 = rm.registerNode("h1:1234", 10000);
+      
+      NMTokenSecretManagerInRM nmTokenSecretManager =
+          rm.getRMContext().getNMTokenSecretManager();
+      
+      // submitting new application
+      RMApp app = rm.submitApp(1000);
+      
+      // start scheduling.
+      nm1.nodeHeartbeat(true);
+      
+      // Starting application attempt and launching
+      // It should get registered with NMTokenSecretManager.
+      RMAppAttempt attempt = app.getCurrentAppAttempt();
+
+      MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
+
+      Assert.assertTrue(nmTokenSecretManager
+          .isApplicationAttemptRegistered(attempt.getAppAttemptId()));
+      
+      // This will register application master.
+      am.registerAppAttempt();
+      
+      ArrayList<Container> containersReceivedForNM1 =
+          new ArrayList<Container>();
+      List<ContainerId> releaseContainerList =
+          new ArrayList<ContainerId>();
+      HashMap<String, Token> nmTokens = new HashMap<String, Token>();
+
+      // initially requesting 2 containers.
+      AllocateResponse response =
+          am.allocate("h1", 1000, 2, releaseContainerList);
+      nm1.nodeHeartbeat(true);
+      Assert.assertEquals(0, response.getAllocatedContainers().size());
+      allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 2,
+          nmTokens);
+      Assert.assertEquals(1, nmTokens.size());
+
+      
+      // requesting 2 more containers.
+      response = am.allocate("h1", 1000, 2, releaseContainerList);
+      nm1.nodeHeartbeat(true);
+      Assert.assertEquals(0, response.getAllocatedContainers().size());
+      allocateContainersAndValidateNMTokens(am, containersReceivedForNM1, 4,
+          nmTokens);
+      Assert.assertEquals(1, nmTokens.size());
+      
+      
+      // We will be simulating NM restart so restarting newly added h2:1234
+      // NM 2 now registers.
+      MockNM nm2 = rm.registerNode("h2:1234", 10000);
+      nm2.nodeHeartbeat(true);
+      ArrayList<Container> containersReceivedForNM2 =
+          new ArrayList<Container>();
+      
+      response = am.allocate("h2", 1000, 2, releaseContainerList);
+      nm2.nodeHeartbeat(true);
+      Assert.assertEquals(0, response.getAllocatedContainers().size());
+      allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 2,
+          nmTokens);
+      Assert.assertEquals(2, nmTokens.size());
+      
+      // Simulating NM-2 restart.
+      nm2 = rm.registerNode("h2:1234", 10000);
+      nm2.nodeHeartbeat(true);
+      
+      int interval = 40;
+      // Wait for nm Token to be cleared.
+      while (nmTokenSecretManager
+          .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
+              nm2.getNodeId()) && interval-- > 0) {
+        LOG.info("waiting for nmToken to be cleared for : " + nm2.getNodeId());
+        Thread.sleep(1000);
+      }
+      Assert.assertTrue(nmTokenSecretManager
+          .isApplicationAttemptRegistered(attempt.getAppAttemptId()));
+      
+      // removing NMToken for h2:1234
+      nmTokens.remove(nm2.getNodeId().toString());
+      Assert.assertEquals(1, nmTokens.size());
+      
+      // We should again receive the NMToken.
+      response = am.allocate("h2", 1000, 2, releaseContainerList);
+      nm2.nodeHeartbeat(true);
+      Assert.assertEquals(0, response.getAllocatedContainers().size());
+      allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 4,
+          nmTokens);
+      Assert.assertEquals(2, nmTokens.size());
+
+      // Now rolling over NMToken masterKey. it should resend the NMToken in
+      // next allocate call.
+      Assert.assertTrue(nmTokenSecretManager
+          .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
+              nm1.getNodeId()));
+      Assert.assertTrue(nmTokenSecretManager
+          .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
+              nm2.getNodeId()));
+      
+      nmTokenSecretManager.rollMasterKey();
+      nmTokenSecretManager.activateNextMasterKey();
+      
+      Assert.assertFalse(nmTokenSecretManager
+          .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
+              nm1.getNodeId()));
+      Assert.assertFalse(nmTokenSecretManager
+          .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
+              nm2.getNodeId()));
+      // It should not remove application attempt entry.
+      Assert.assertTrue(nmTokenSecretManager
+          .isApplicationAttemptRegistered(attempt.getAppAttemptId()));
+
+      nmTokens.clear();
+      Assert.assertEquals(0, nmTokens.size());
+      // We should again receive the NMToken.
+      response = am.allocate("h2", 1000, 1, releaseContainerList);
+      nm2.nodeHeartbeat(true);
+      Assert.assertEquals(0, response.getAllocatedContainers().size());
+      allocateContainersAndValidateNMTokens(am, containersReceivedForNM2, 5,
+          nmTokens);
+      Assert.assertEquals(1, nmTokens.size());
+      Assert.assertTrue(nmTokenSecretManager
+          .isApplicationAttemptNMTokenPresent(attempt.getAppAttemptId(),
+              nm2.getNodeId()));
+      
+      
+      // After AM is finished making sure that nmtoken entry for app
+      Assert.assertTrue(nmTokenSecretManager
+          .isApplicationAttemptRegistered(attempt.getAppAttemptId()));
+      am.unregisterAppAttempt();
+      // marking all the containers as finished.
+      for (Container container : containersReceivedForNM1) {
+        nm1.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
+            ContainerState.COMPLETE);
+      }
+      for (Container container : containersReceivedForNM2) {
+        nm2.nodeHeartbeat(attempt.getAppAttemptId(), container.getId().getId(),
+            ContainerState.COMPLETE);
+      }
+      am.waitForState(RMAppAttemptState.FINISHED);
+      Assert.assertFalse(nmTokenSecretManager
+          .isApplicationAttemptRegistered(attempt.getAppAttemptId()));
+    } finally {
+      rm.stop();
+    }
+  }
+
+  protected void allocateContainersAndValidateNMTokens(MockAM am,
+      ArrayList<Container> containersReceived, int totalContainerRequested,
+      HashMap<String, Token> nmTokens) throws Exception, InterruptedException {
+    ArrayList<ContainerId> releaseContainerList = new ArrayList<ContainerId>();
+    AllocateResponse response;
+    ArrayList<ResourceRequest> resourceRequest =
+        new ArrayList<ResourceRequest>();      
+    while (containersReceived.size() < totalContainerRequested) {
+      LOG.info("requesting containers..");
+      response =
+          am.allocate(resourceRequest, releaseContainerList);
+      containersReceived.addAll(response.getAllocatedContainers());
+      if (!response.getNMTokens().isEmpty()) {
+        for (NMToken nmToken : response.getNMTokens()) {
+          String nodeId = nmToken.getNodeId().toString();
+          if (nmTokens.containsKey(nodeId)) {
+            Assert.fail("Duplicate NMToken received for : " + nodeId);
+          }
+          nmTokens.put(nodeId, nmToken.getToken());
+        }
+      }
+      LOG.info("Got " + containersReceived.size()
+          + " containers. Waiting to get " + totalContainerRequested);
+      Thread.sleep(500);
+    }
+  }
 
   @Test (timeout = 300000)
   public void testActivatingApplicationAfterAddingNM() throws Exception {