You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by vi...@apache.org on 2013/06/19 01:19:51 UTC

svn commit: r1494369 [1/3] - in /hadoop/common/trunk/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/...

Author: vinodkv
Date: Tue Jun 18 23:19:49 2013
New Revision: 1494369

URL: http://svn.apache.org/r1494369
Log:
YARN-694. Starting to use NMTokens to authenticate all communication with NodeManagers. Contributed by Omkar Vinit Joshi.

Added:
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/resources/core-site.xml
Modified:
    hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/security/LocalizerTokenSecretManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMContainerTokenSecretManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/security/NMTokenSecretManagerInNM.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/DummyContainerManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestContainerManagerWithLCE.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerReboot.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerShutdown.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/BaseContainerManagerTest.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/TestContainerLaunch.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/monitor/TestContainersMonitor.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/amlauncher/AMLauncher.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/security/NMTokenSecretManagerInRM.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
    hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/TestContainerManagerSecurity.java

Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Tue Jun 18 23:19:49 2013
@@ -199,6 +199,9 @@ Release 2.1.0-beta - UNRELEASED
     YARN-850. Rename getClusterAvailableResources to getAvailableResources in
     AMRMClients (Jian He via bikas)
 
+    YARN-694. Starting to use NMTokens to authenticate all communication with
+    NodeManagers. (Omkar Vinit Joshi via vinodkv) 
+
   NEW FEATURES
 
     YARN-482. FS: Extend SchedulingMode to intermediate queues. 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java Tue Jun 18 23:19:49 2013
@@ -728,6 +728,23 @@ public class YarnConfiguration extends C
       YARN_PREFIX + "client.nodemanager-client-async.thread-pool-max-size";
   public static final int DEFAULT_NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE = 500;
 
+  /**
+   * Maximum number of proxy connections for node manager. It should always be
+   * more than 1. NMClient and MRAppMaster will use this to cache connection
+   * with node manager. There will be at max one connection per node manager.
+   * Ex. configuring it to a value of 5 will make sure that client will at
+   * max have 5 connections cached with 5 different node managers. These
+   * connections will be timed out if idle for more than system wide idle
+   * timeout period. The token if used for authentication then it will be used
+   * only at connection creation time. If new token is received then earlier
+   * connection should be closed in order to use newer token.
+   * Note: {@link YarnConfiguration#NM_CLIENT_ASYNC_THREAD_POOL_MAX_SIZE}
+   * are related to each other.
+   */
+  public static final String NM_CLIENT_MAX_NM_PROXIES =
+      YARN_PREFIX + "client.max-nodemanagers-proxies";
+  public static final int DEFAULT_NM_CLIENT_MAX_NM_PROXIES = 500;
+  
   public YarnConfiguration() {
     super();
   }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java Tue Jun 18 23:19:49 2013
@@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
 import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
 import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -447,7 +448,8 @@ public class ApplicationMaster {
     resourceManager.start();
 
     containerListener = new NMCallbackHandler();
-    nmClientAsync = NMClientAsync.createNMClientAsync(containerListener);
+    nmClientAsync =
+        new NMClientAsyncImpl(containerListener, resourceManager.getNMTokens());
     nmClientAsync.init(conf);
     nmClientAsync.start();
 
@@ -683,8 +685,7 @@ public class ApplicationMaster {
       }
       Container container = containers.get(containerId);
       if (container != null) {
-        nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId(),
-            container.getContainerToken());
+        nmClientAsync.getContainerStatusAsync(containerId, container.getNodeId());
       }
     }
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/test/java/org/apache/hadoop/yarn/applications/distributedshell/TestDistributedShell.java Tue Jun 18 23:19:49 2013
@@ -26,6 +26,7 @@ import java.io.OutputStream;
 import java.net.URL;
 
 import junit.framework.Assert;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -33,6 +34,8 @@ import org.apache.hadoop.util.JarFinder;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.server.MiniYARNCluster;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
 import org.junit.AfterClass;
@@ -50,7 +53,7 @@ public class TestDistributedShell {
   protected static String APPMASTER_JAR = JarFinder.getJar(ApplicationMaster.class);
 
   @BeforeClass
-  public static void setup() throws InterruptedException, IOException {
+  public static void setup() throws InterruptedException, Exception {
     LOG.info("Starting up YARN cluster");
     conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 128);
     conf.setClass(YarnConfiguration.RM_SCHEDULER, 
@@ -60,6 +63,9 @@ public class TestDistributedShell {
         TestDistributedShell.class.getSimpleName(), 1, 1, 1);
       yarnCluster.init(conf);
       yarnCluster.start();
+      NodeManager  nm = yarnCluster.getNodeManager(0);
+      waitForNMToRegister(nm);
+      
       URL url = Thread.currentThread().getContextClassLoader().getResource("yarn-site.xml");
       if (url == null) {
         throw new RuntimeException("Could not find 'yarn-site.xml' dummy file in classpath");
@@ -195,5 +201,14 @@ public class TestDistributedShell {
     }
   }
 
+  protected static void waitForNMToRegister(NodeManager nm)
+      throws Exception {
+    int attempt = 60;
+    ContainerManagerImpl cm =
+        ((ContainerManagerImpl) nm.getNMContext().getContainerManager());
+    while (cm.getBlockNewContainerRequestsStatus() && attempt-- > 0) {
+      Thread.sleep(2000);
+    }
+  }
 }
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/NMClient.java Tue Jun 18 23:19:49 2013
@@ -1,4 +1,5 @@
 /**
+
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -21,16 +22,19 @@ package org.apache.hadoop.yarn.client.ap
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.impl.NMClientImpl;
@@ -42,19 +46,30 @@ public abstract class NMClient extends A
 
   /**
    * Create a new instance of NMClient.
+   * @param nmTokens need to pass map of NMTokens which are received on
+   * {@link AMRMClient#allocate(float)} call as a part of
+   * {@link AllocateResponse}. 
+   * key :- NodeAddr (host:port)
+   * Value :- Token {@link NMToken#getToken()}
    */
   @Public
-  public static NMClient createNMClient() {
-    NMClient client = new NMClientImpl();
+  public static NMClient createNMClient(ConcurrentMap<String, Token> nmTokens) {
+    NMClient client = new NMClientImpl(nmTokens);
     return client;
   }
 
   /**
    * Create a new instance of NMClient.
+   * @param nmTokens need to pass map of NMTokens which are received on
+   * {@link AMRMClient#allocate(float)} call as a part of
+   * {@link AllocateResponse}. 
+   * key :- NodeAddr (host:port)
+   * Value :- Token {@link NMToken#getToken()}
    */
   @Public
-  public static NMClient createNMClient(String name) {
-    NMClient client = new NMClientImpl(name);
+  public static NMClient createNMClient(String name,
+      ConcurrentMap<String, Token> nmTokens) {
+    NMClient client = new NMClientImpl(name, nmTokens);
     return client;
   }
 
@@ -89,35 +104,33 @@ public abstract class NMClient extends A
    *
    * @param containerId the Id of the started container
    * @param nodeId the Id of the <code>NodeManager</code>
-   * @param containerToken the security token to verify authenticity of the
-   *                       started container
+   * 
    * @throws YarnException
    * @throws IOException
    */
-  public abstract void stopContainer(ContainerId containerId, NodeId nodeId,
-      Token containerToken) throws YarnException, IOException;
+  public abstract void stopContainer(ContainerId containerId, NodeId nodeId)
+      throws YarnException, IOException;
 
   /**
    * <p>Query the status of a container.</p>
    *
    * @param containerId the Id of the started container
    * @param nodeId the Id of the <code>NodeManager</code>
-   * @param containerToken the security token to verify authenticity of the
-   *                       started container
+   * 
    * @return the status of a container
    * @throws YarnException
    * @throws IOException
    */
-  public abstract ContainerStatus getContainerStatus(ContainerId containerId, NodeId nodeId,
-      Token containerToken) throws YarnException, IOException;
+  public abstract ContainerStatus getContainerStatus(ContainerId containerId,
+      NodeId nodeId) throws YarnException, IOException;
 
   /**
    * <p>Set whether the containers that are started by this client, and are
    * still running should be stopped when the client stops. By default, the
-   * feature should be enabled.</p>
+   * feature should be enabled.</p> However, containers will be stopped only  
+   * when service is stopped. i.e. after {@link NMClient#stop()}. 
    *
    * @param enabled whether the feature is enabled or not
    */
   public abstract void cleanupRunningContainersOnStop(boolean enabled);
-
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/NMClientAsync.java Tue Jun 18 23:19:49 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.client.ap
 
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
@@ -110,16 +111,19 @@ public abstract class NMClientAsync exte
   protected NMClient client;
   protected CallbackHandler callbackHandler;
 
-  public static NMClientAsync createNMClientAsync(CallbackHandler callbackHandler) {
-    return new NMClientAsyncImpl(callbackHandler);
+  public static NMClientAsync createNMClientAsync(
+      CallbackHandler callbackHandler, ConcurrentMap<String, Token> nmTokens) {
+    return new NMClientAsyncImpl(callbackHandler, nmTokens);
   }
   
-  protected NMClientAsync(CallbackHandler callbackHandler) {
-    this (NMClientAsync.class.getName(), callbackHandler);
+  protected NMClientAsync(CallbackHandler callbackHandler,
+      ConcurrentMap<String, Token> nmTokens) {
+    this (NMClientAsync.class.getName(), callbackHandler, nmTokens);
   }
 
-  protected NMClientAsync(String name, CallbackHandler callbackHandler) {
-    this (name, new NMClientImpl(), callbackHandler);
+  protected NMClientAsync(String name, CallbackHandler callbackHandler,
+      ConcurrentMap<String, Token> nmTokens) {
+    this (name, new NMClientImpl(nmTokens), callbackHandler);
   }
 
   @Private
@@ -135,10 +139,10 @@ public abstract class NMClientAsync exte
       Container container, ContainerLaunchContext containerLaunchContext);
 
   public abstract void stopContainerAsync(
-      ContainerId containerId, NodeId nodeId, Token containerToken);
+      ContainerId containerId, NodeId nodeId);
 
   public abstract void getContainerStatusAsync(
-      ContainerId containerId, NodeId nodeId, Token containerToken);
+      ContainerId containerId, NodeId nodeId);
   
   public NMClient getClient() {
     return client;

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/async/impl/NMClientAsyncImpl.java Tue Jun 18 23:19:49 2013
@@ -82,12 +82,14 @@ public class NMClientAsyncImpl extends N
   protected ConcurrentMap<ContainerId, StatefulContainer> containers =
       new ConcurrentHashMap<ContainerId, StatefulContainer>();
 
-  public NMClientAsyncImpl(CallbackHandler callbackHandler) {
-    this (NMClientAsyncImpl.class.getName(), callbackHandler);
+  public NMClientAsyncImpl(CallbackHandler callbackHandler,
+      ConcurrentMap<String, Token> nmTokens) {
+    this(NMClientAsync.class.getName(), callbackHandler, nmTokens);
   }
 
-  public NMClientAsyncImpl(String name, CallbackHandler callbackHandler) {
-    this (name, new NMClientImpl(), callbackHandler);
+  public NMClientAsyncImpl(String name, CallbackHandler callbackHandler,
+      ConcurrentMap<String, Token> nmTokens) {
+    this(name, new NMClientImpl(nmTokens), callbackHandler);
   }
 
   @Private
@@ -229,15 +231,14 @@ public class NMClientAsyncImpl extends N
     }
   }
 
-  public void stopContainerAsync(ContainerId containerId, NodeId nodeId,
-      Token containerToken) {
+  public void stopContainerAsync(ContainerId containerId, NodeId nodeId) {
     if (containers.get(containerId) == null) {
       callbackHandler.onStopContainerError(containerId,
           RPCUtil.getRemoteException("Container " + containerId +
               " is neither started nor scheduled to start"));
     }
     try {
-      events.put(new ContainerEvent(containerId, nodeId, containerToken,
+      events.put(new ContainerEvent(containerId, nodeId, null,
           ContainerEventType.STOP_CONTAINER));
     } catch (InterruptedException e) {
       LOG.warn("Exception when scheduling the event of stopping Container " +
@@ -246,10 +247,9 @@ public class NMClientAsyncImpl extends N
     }
   }
 
-  public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId,
-      Token containerToken) {
+ public void getContainerStatusAsync(ContainerId containerId, NodeId nodeId) {
     try {
-      events.put(new ContainerEvent(containerId, nodeId, containerToken,
+      events.put(new ContainerEvent(containerId, nodeId, null,
           ContainerEventType.QUERY_CONTAINER));
     } catch (InterruptedException e) {
       LOG.warn("Exception when scheduling the event of querying the status" +
@@ -421,9 +421,9 @@ public class NMClientAsyncImpl extends N
           StatefulContainer container, ContainerEvent event) {
         ContainerId containerId = event.getContainerId();
         try {
-          container.nmClientAsync.getClient().stopContainer(
-              containerId, event.getNodeId(), event.getContainerToken());
-          try {
+         container.nmClientAsync.getClient().stopContainer(
+              containerId, event.getNodeId());
+         try {
             container.nmClientAsync.getCallbackHandler().onContainerStopped(
                 event.getContainerId());
           } catch (Throwable thr) {
@@ -534,7 +534,7 @@ public class NMClientAsyncImpl extends N
       if (event.getType() == ContainerEventType.QUERY_CONTAINER) {
         try {
           ContainerStatus containerStatus = client.getContainerStatus(
-              containerId, event.getNodeId(), event.getContainerToken());
+              containerId, event.getNodeId());
           try {
             callbackHandler.onContainerStatusReceived(
                 containerId, containerStatus);

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java?rev=1494369&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/ContainerManagementProtocolProxy.java Tue Jun 18 23:19:49 2013
@@ -0,0 +1,237 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.client.api.impl;
+
+import java.net.InetSocketAddress;
+import java.security.PrivilegedAction;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.ipc.YarnRPC;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.common.annotations.VisibleForTesting;
+
+
+/**
+ * Helper class to manage container manager proxies
+ */
+@LimitedPrivate({ "MapReduce", "YARN" })
+public class ContainerManagementProtocolProxy {
+  static final Log LOG = LogFactory.getLog(ContainerManagementProtocolProxy.class);
+
+  private final int maxConnectedNMs;
+  private final LinkedHashMap<String, ContainerManagementProtocolProxyData> cmProxy;
+  private Map<String, Token> nmTokens;
+  private final Configuration conf;
+  private final YarnRPC rpc;
+  
+  public ContainerManagementProtocolProxy(Configuration conf,
+      Map<String, Token> nmTokens) {
+    this.nmTokens = nmTokens;
+    this.conf = conf;
+
+    maxConnectedNMs =
+        conf.getInt(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES,
+            YarnConfiguration.DEFAULT_NM_CLIENT_MAX_NM_PROXIES);
+    if (maxConnectedNMs < 1) {
+      throw new YarnRuntimeException(
+          YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES
+              + " (" + maxConnectedNMs + ") can not be less than 1.");
+    }
+    LOG.info(YarnConfiguration.NM_CLIENT_MAX_NM_PROXIES + " : "
+        + maxConnectedNMs);
+
+    cmProxy =
+        new LinkedHashMap<String, ContainerManagementProtocolProxyData>();
+    rpc = YarnRPC.create(conf);
+  }
+  
+  public synchronized ContainerManagementProtocolProxyData getProxy(
+      String containerManagerBindAddr, ContainerId containerId)
+      throws InvalidToken {
+    
+    // This get call will update the map which is working as LRU cache.
+    ContainerManagementProtocolProxyData proxy =
+        cmProxy.get(containerManagerBindAddr);
+    
+    while (proxy != null
+        && !proxy.token.getIdentifier().equals(
+            nmTokens.get(containerManagerBindAddr).getIdentifier())) {
+      LOG.info("Refreshing proxy as NMToken got updated for node : "
+          + containerManagerBindAddr);
+      // Token is updated. check if anyone has already tried closing it.
+      if (!proxy.scheduledForClose) {
+        // try closing the proxy. Here if someone is already using it
+        // then we might not close it. In which case we will wait.
+        removeProxy(proxy);
+      } else {
+        try {
+          this.wait();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+      if (proxy.activeCallers < 0) {
+        proxy = cmProxy.get(containerManagerBindAddr);
+      }
+    }
+    
+    if (proxy == null) {
+      proxy =
+          new ContainerManagementProtocolProxyData(rpc, containerManagerBindAddr,
+              containerId, nmTokens.get(containerManagerBindAddr));
+      if (cmProxy.size() > maxConnectedNMs) {
+        // Number of existing proxy exceed the limit.
+        String cmAddr = cmProxy.keySet().iterator().next();
+        removeProxy(cmProxy.get(cmAddr));
+      }
+      
+      cmProxy.put(containerManagerBindAddr, proxy);
+    }
+    // This is to track active users of this proxy.
+    proxy.activeCallers++;
+    updateLRUCache(containerManagerBindAddr);
+    
+    return proxy;
+  }
+  
+  private void updateLRUCache(String containerManagerBindAddr) {
+    ContainerManagementProtocolProxyData proxy =
+        cmProxy.remove(containerManagerBindAddr);
+    cmProxy.put(containerManagerBindAddr, proxy);
+  }
+
+  public synchronized void mayBeCloseProxy(
+      ContainerManagementProtocolProxyData proxy) {
+    proxy.activeCallers--;
+    if (proxy.scheduledForClose && proxy.activeCallers < 0) {
+      LOG.info("Closing proxy : " + proxy.containerManagerBindAddr);
+      cmProxy.remove(proxy.containerManagerBindAddr);
+      try {
+        rpc.stopProxy(proxy.getContainerManagementProtocol(), conf);
+      } finally {
+        this.notifyAll();
+      }
+    }
+  }
+
+  private synchronized void removeProxy(
+      ContainerManagementProtocolProxyData proxy) {
+    if (!proxy.scheduledForClose) {
+      proxy.scheduledForClose = true;
+      mayBeCloseProxy(proxy);
+    }
+  }
+  
+  public synchronized void stopAllProxies() {
+    List<String> nodeIds = new ArrayList<String>();
+    nodeIds.addAll(this.cmProxy.keySet());
+    for (String nodeId : nodeIds) {
+      ContainerManagementProtocolProxyData proxy = cmProxy.get(nodeId);
+      // Explicitly reducing the proxy count to allow stopping proxy.
+      proxy.activeCallers = 0;
+      try {
+        removeProxy(proxy);
+      } catch (Throwable t) {
+        LOG.error("Error closing connection", t);
+      }
+    }
+    cmProxy.clear();
+  }
+  
+  public synchronized void setNMTokens(Map<String, Token> nmTokens) {
+    this.nmTokens = nmTokens;
+  }
+  
+  public class ContainerManagementProtocolProxyData {
+    private final String containerManagerBindAddr;
+    private final ContainerManagementProtocol proxy;
+    private int activeCallers;
+    private boolean scheduledForClose;
+    private final Token token;
+    
+    @Private
+    @VisibleForTesting
+    public ContainerManagementProtocolProxyData(YarnRPC rpc,
+        String containerManagerBindAddr,
+        ContainerId containerId, Token token) throws InvalidToken {
+      this.containerManagerBindAddr = containerManagerBindAddr;
+      ;
+      this.activeCallers = 0;
+      this.scheduledForClose = false;
+      this.token = token;
+      this.proxy = newProxy(rpc, containerManagerBindAddr, containerId, token);
+    }
+
+    @Private
+    @VisibleForTesting
+    protected ContainerManagementProtocol newProxy(final YarnRPC rpc,
+        String containerManagerBindAddr, ContainerId containerId, Token token)
+        throws InvalidToken {
+      if (token == null) {
+        throw new InvalidToken("No NMToken sent for "
+            + containerManagerBindAddr);
+      }
+      final InetSocketAddress cmAddr =
+          NetUtils.createSocketAddr(containerManagerBindAddr);
+      LOG.info("Opening proxy : " + containerManagerBindAddr);
+      // the user in createRemoteUser in this context has to be ContainerID
+      UserGroupInformation user =
+          UserGroupInformation.createRemoteUser(containerId
+              .getApplicationAttemptId().toString());
+
+      org.apache.hadoop.security.token.Token<NMTokenIdentifier> nmToken =
+          ConverterUtils.convertFromYarn(token, cmAddr);
+      user.addToken(nmToken);
+
+      ContainerManagementProtocol proxy = user
+          .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
+
+            @Override
+            public ContainerManagementProtocol run() {
+              return (ContainerManagementProtocol) rpc.getProxy(
+                  ContainerManagementProtocol.class, cmAddr, conf);
+            }
+          });
+      return proxy;
+    }
+
+    public ContainerManagementProtocol getContainerManagementProtocol() {
+      return proxy;
+    }
+  }
+  
+}

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/NMClientImpl.java Tue Jun 18 23:19:49 2013
@@ -19,9 +19,7 @@
 package org.apache.hadoop.yarn.client.api.impl;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.security.PrivilegedAction;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -29,31 +27,23 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.service.AbstractService;
-import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
-import org.apache.hadoop.yarn.api.protocolrecords.StartContainerResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StopContainerRequest;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.impl.ContainerManagementProtocolProxy.ContainerManagementProtocolProxyData;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.ipc.RPCUtil;
-import org.apache.hadoop.yarn.ipc.YarnRPC;
-import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
-import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.hadoop.yarn.util.Records;
 
 /**
  * <p>
@@ -91,14 +81,18 @@ public class NMClientImpl extends NMClie
       new ConcurrentHashMap<ContainerId, StartedContainer>();
 
   //enabled by default
-  private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+ private final AtomicBoolean cleanupRunningContainers = new AtomicBoolean(true);
+ private ContainerManagementProtocolProxy cmProxy;
+  private ConcurrentMap<String, Token> nmTokens;
 
-  public NMClientImpl() {
+  public NMClientImpl(ConcurrentMap<String, Token> nmTokens) {
     super(NMClientImpl.class.getName());
+    this.nmTokens = nmTokens;
   }
 
-  public NMClientImpl(String name) {
+  public NMClientImpl(String name, ConcurrentMap<String, Token> nmTokens) {
     super(name);
+    this.nmTokens = nmTokens;
   }
 
   @Override
@@ -108,6 +102,7 @@ public class NMClientImpl extends NMClie
     if (getCleanupRunningContainers().get()) {
       cleanupRunningContainers();
     }
+    cmProxy.stopAllProxies();
     super.serviceStop();
   }
 
@@ -115,8 +110,7 @@ public class NMClientImpl extends NMClie
     for (StartedContainer startedContainer : startedContainers.values()) {
       try {
         stopContainer(startedContainer.getContainerId(),
-            startedContainer.getNodeId(),
-            startedContainer.getContainerToken());
+            startedContainer.getNodeId());
       } catch (YarnException e) {
         LOG.error("Failed to stop Container " +
             startedContainer.getContainerId() +
@@ -130,22 +124,28 @@ public class NMClientImpl extends NMClie
   }
 
   @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    super.serviceInit(conf);
+    cmProxy =
+        new ContainerManagementProtocolProxy(conf, nmTokens);
+  }
+  
+  @Override
   public void cleanupRunningContainersOnStop(boolean enabled) {
     getCleanupRunningContainers().set(enabled);
   }
-
+  
   protected static class StartedContainer {
     private ContainerId containerId;
     private NodeId nodeId;
-    private Token containerToken;
-    private boolean stopped;
-
+    private ContainerState state;
+    
+    
     public StartedContainer(ContainerId containerId, NodeId nodeId,
         Token containerToken) {
       this.containerId = containerId;
       this.nodeId = nodeId;
-      this.containerToken = containerToken;
-      stopped = false;
+      state = ContainerState.NEW;
     }
 
     public ContainerId getContainerId() {
@@ -155,137 +155,17 @@ public class NMClientImpl extends NMClie
     public NodeId getNodeId() {
       return nodeId;
     }
-
-    public Token getContainerToken() {
-      return containerToken;
-    }
   }
 
-  protected static final class NMCommunicator extends AbstractService {
-    private ContainerId containerId;
-    private NodeId nodeId;
-    private Token containerToken;
-    private ContainerManagementProtocol containerManager;
-
-    public NMCommunicator(ContainerId containerId, NodeId nodeId,
-        Token containerToken) {
-      super(NMCommunicator.class.getName());
-      this.containerId = containerId;
-      this.nodeId = nodeId;
-      this.containerToken = containerToken;
-    }
-
-    @Override
-    protected synchronized void serviceStart() throws Exception {
-      final YarnRPC rpc = YarnRPC.create(getConfig());
-
-      final InetSocketAddress containerAddress =
-          NetUtils.createSocketAddr(nodeId.toString());
-
-      // the user in createRemoteUser in this context has to be ContainerId
-      UserGroupInformation currentUser =
-          UserGroupInformation.createRemoteUser(containerId.toString());
-
-      org.apache.hadoop.security.token.Token<ContainerTokenIdentifier> token =
-          ConverterUtils.convertFromYarn(containerToken, containerAddress);
-      currentUser.addToken(token);
-
-      containerManager = currentUser
-          .doAs(new PrivilegedAction<ContainerManagementProtocol>() {
-            @Override
-            public ContainerManagementProtocol run() {
-              return (ContainerManagementProtocol) rpc.getProxy(ContainerManagementProtocol.class,
-                  containerAddress, getConfig());
-            }
-          });
-
-      LOG.debug("Connecting to ContainerManager at " + containerAddress);
-      super.serviceStart();
-    }
-
-    @Override
-    protected synchronized void serviceStop() throws Exception {
-      if (this.containerManager != null) {
-        RPC.stopProxy(this.containerManager);
-
-        if (LOG.isDebugEnabled()) {
-          InetSocketAddress containerAddress =
-              NetUtils.createSocketAddr(nodeId.toString());
-          LOG.debug("Disconnecting from ContainerManager at " +
-              containerAddress);
-        }
-      }
-      super.serviceStop();
-    }
-
-    public synchronized Map<String, ByteBuffer> startContainer(
-        Container container, ContainerLaunchContext containerLaunchContext)
-            throws YarnException, IOException {
-      if (!container.getId().equals(containerId)) {
-        throw new IllegalArgumentException(
-            "NMCommunicator's containerId  mismatches the given Container's");
-      }
-      StartContainerResponse startResponse = null;
-      try {
-        StartContainerRequest startRequest =
-            Records.newRecord(StartContainerRequest.class);
-        startRequest.setContainerToken(container.getContainerToken());
-        startRequest.setContainerLaunchContext(containerLaunchContext);
-        startResponse = containerManager.startContainer(startRequest);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Started Container " + containerId);
-        }
-      } catch (YarnException e) {
-        LOG.warn("Container " + containerId + " failed to start", e);
-        throw e;
-      } catch (IOException e) {
-        LOG.warn("Container " + containerId + " failed to start", e);
-        throw e;
-      }
-      return startResponse.getAllServicesMetaData();
-    }
-
-    public synchronized void stopContainer() throws YarnException,
-        IOException {
-      try {
-        StopContainerRequest stopRequest =
-            Records.newRecord(StopContainerRequest.class);
-        stopRequest.setContainerId(containerId);
-        containerManager.stopContainer(stopRequest);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Stopped Container " + containerId);
-        }
-      } catch (YarnException e) {
-        LOG.warn("Container " + containerId + " failed to stop", e);
-        throw e;
-      } catch (IOException e) {
-        LOG.warn("Container " + containerId + " failed to stop", e);
-        throw e;
-      }
-    }
-
-    public synchronized ContainerStatus getContainerStatus()
-        throws YarnException, IOException {
-      GetContainerStatusResponse statusResponse = null;
-      try {
-        GetContainerStatusRequest statusRequest =
-            Records.newRecord(GetContainerStatusRequest.class);
-        statusRequest.setContainerId(containerId);
-        statusResponse = containerManager.getContainerStatus(statusRequest);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Got the status of Container " + containerId);
-        }
-      } catch (YarnException e) {
-        LOG.warn(
-            "Unable to get the status of Container " + containerId, e);
-        throw e;
-      } catch (IOException e) {
-        LOG.warn(
-            "Unable to get the status of Container " + containerId, e);
-        throw e;
-      }
-      return statusResponse.getStatus();
+  private void addStartingContainer(StartedContainer startedContainer)
+      throws YarnException {
+    if (startedContainers.putIfAbsent(startedContainer.containerId,
+        startedContainer) != null) {
+      throw RPCUtil.getRemoteException("Container "
+          + startedContainer.containerId.toString() + " is already started");
     }
+    startedContainers
+        .put(startedContainer.getContainerId(), startedContainer);
   }
 
   @Override
@@ -293,108 +173,112 @@ public class NMClientImpl extends NMClie
       Container container, ContainerLaunchContext containerLaunchContext)
           throws YarnException, IOException {
     // Do synchronization on StartedContainer to prevent race condition
-    // between startContainer and stopContainer
-    synchronized (addStartedContainer(container)) {
+    // between startContainer and stopContainer only when startContainer is
+    // in progress for a given container.
+    StartedContainer startingContainer = createStartedContainer(container);
+    synchronized (startingContainer) {
+      addStartingContainer(startingContainer);
+      
       Map<String, ByteBuffer> allServiceResponse;
-      NMCommunicator nmCommunicator = null;
+      ContainerManagementProtocolProxyData proxy = null;
       try {
-        nmCommunicator = new NMCommunicator(container.getId(),
-            container.getNodeId(), container.getContainerToken());
-        nmCommunicator.init(getConfig());
-        nmCommunicator.start();
+        proxy =
+            cmProxy.getProxy(container.getNodeId().toString(),
+                container.getId());
         allServiceResponse =
-            nmCommunicator.startContainer(container, containerLaunchContext);
+            proxy
+                .getContainerManagementProtocol().startContainer(
+                    StartContainerRequest.newInstance(containerLaunchContext,
+                        container.getContainerToken())).getAllServicesMetaData();
+        startingContainer.state = ContainerState.RUNNING;
       } catch (YarnException e) {
+        startingContainer.state = ContainerState.COMPLETE;
         // Remove the started container if it failed to start
-        removeStartedContainer(container.getId());
+        removeStartedContainer(startingContainer);
         throw e;
       } catch (IOException e) {
-        removeStartedContainer(container.getId());
+        startingContainer.state = ContainerState.COMPLETE;
+        removeStartedContainer(startingContainer);
         throw e;
       } catch (Throwable t) {
-        removeStartedContainer(container.getId());
+        startingContainer.state = ContainerState.COMPLETE;
+        removeStartedContainer(startingContainer);
         throw RPCUtil.getRemoteException(t);
       } finally {
-        if (nmCommunicator != null) {
-          nmCommunicator.stop();
+        if (proxy != null) {
+          cmProxy.mayBeCloseProxy(proxy);
         }
       }
       return allServiceResponse;
     }
-
-    // Three choices:
-    // 1. starting and releasing the proxy before and after each interaction
-    // 2. starting the proxy when starting the container and releasing it when
-    // stopping the container
-    // 3. starting the proxy when starting the container and releasing it when
-    // stopping the client
-    // Adopt 1 currently
   }
 
   @Override
-  public void stopContainer(ContainerId containerId, NodeId nodeId,
-      Token containerToken) throws YarnException, IOException {
+  public void stopContainer(ContainerId containerId, NodeId nodeId)
+      throws YarnException, IOException {
     StartedContainer startedContainer = getStartedContainer(containerId);
-    if (startedContainer == null) {
-      throw RPCUtil.getRemoteException("Container " + containerId +
-          " is either not started yet or already stopped");
-    }
+
     // Only allow one request of stopping the container to move forward
     // When entering the block, check whether the precursor has already stopped
     // the container
-    synchronized (startedContainer) {
-      if (startedContainer.stopped) {
-        return;
-      }
-      NMCommunicator nmCommunicator = null;
-      try {
-        nmCommunicator =
-            new NMCommunicator(containerId, nodeId, containerToken);
-        nmCommunicator.init(getConfig());
-        nmCommunicator.start();
-        nmCommunicator.stopContainer();
-      } finally {
-        if (nmCommunicator != null) {
-          nmCommunicator.stop();
+    if (startedContainer != null) {
+      synchronized (startedContainer) {
+        if (startedContainer.state != ContainerState.RUNNING) {
+          return;
         }
-        startedContainer.stopped = true;
-        removeStartedContainer(containerId);
+        stopContainerInternal(containerId, nodeId);
+        // Only after successful
+        startedContainer.state = ContainerState.COMPLETE;
+        removeStartedContainer(startedContainer);
       }
+    } else {
+      stopContainerInternal(containerId, nodeId);
     }
+
   }
 
   @Override
   public ContainerStatus getContainerStatus(ContainerId containerId,
-      NodeId nodeId, Token containerToken)
-          throws YarnException, IOException {
-    NMCommunicator nmCommunicator = null;
+      NodeId nodeId) throws YarnException, IOException {
+
+    ContainerManagementProtocolProxyData proxy = null;
     try {
-      nmCommunicator = new NMCommunicator(containerId, nodeId, containerToken);
-      nmCommunicator.init(getConfig());
-      nmCommunicator.start();
-      ContainerStatus containerStatus = nmCommunicator.getContainerStatus();
+      proxy = cmProxy.getProxy(nodeId.toString(), containerId);
+      ContainerStatus containerStatus =
+          proxy.getContainerManagementProtocol().getContainerStatus(
+              GetContainerStatusRequest.newInstance(containerId)).getStatus();
       return containerStatus;
     } finally {
-      if (nmCommunicator != null) {
-        nmCommunicator.stop();
+      if (proxy != null) {
+        cmProxy.mayBeCloseProxy(proxy);
       }
     }
   }
 
-  protected synchronized StartedContainer addStartedContainer(
-      Container container) throws YarnException, IOException {
-    if (startedContainers.containsKey(container.getId())) {
-      throw RPCUtil.getRemoteException("Container " + container.getId() +
-          " is already started");
+  private void stopContainerInternal(ContainerId containerId, NodeId nodeId)
+      throws IOException, YarnException {
+    ContainerManagementProtocolProxyData proxy = null;
+    try {
+      proxy = cmProxy.getProxy(nodeId.toString(), containerId);
+      proxy.getContainerManagementProtocol().stopContainer(
+          StopContainerRequest.newInstance(containerId));
+    } finally {
+      if (proxy != null) {
+        cmProxy.mayBeCloseProxy(proxy);
+      }
     }
+  }
+  
+  protected synchronized StartedContainer createStartedContainer(
+      Container container) throws YarnException, IOException {
     StartedContainer startedContainer = new StartedContainer(container.getId(),
         container.getNodeId(), container.getContainerToken());
-    startedContainers.put(startedContainer.getContainerId(), startedContainer);
     return startedContainer;
   }
 
-  protected synchronized void removeStartedContainer(ContainerId containerId) {
-    startedContainers.remove(containerId);
+  protected synchronized void
+      removeStartedContainer(StartedContainer container) {
+    startedContainers.remove(container.containerId);
   }
 
   protected synchronized StartedContainer getStartedContainer(
@@ -405,5 +289,4 @@ public class NMClientImpl extends NMClie
   public AtomicBoolean getCleanupRunningContainers() {
     return cleanupRunningContainers;
   }
-
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/async/impl/TestNMClientAsync.java Tue Jun 18 23:19:49 2013
@@ -232,10 +232,10 @@ public class TestNMClientAsync {
         actualStartSuccessArray.set(containerId.getId(), 1);
 
         // move on to the following success tests
-        asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
+        asyncClient.getContainerStatusAsync(containerId, nodeId);
       } else {
         // move on to the following failure tests
-        asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
+        asyncClient.stopContainerAsync(containerId, nodeId);
       }
 
       // Shouldn't crash the test thread
@@ -253,7 +253,7 @@ public class TestNMClientAsync {
       actualQuerySuccess.addAndGet(1);
       actualQuerySuccessArray.set(containerId.getId(), 1);
       // move on to the following success tests
-      asyncClient.stopContainerAsync(containerId, nodeId, containerToken);
+      asyncClient.stopContainerAsync(containerId, nodeId);
 
       // Shouldn't crash the test thread
       throw new RuntimeException("Ignorable Exception");
@@ -290,7 +290,7 @@ public class TestNMClientAsync {
       actualStartFailure.addAndGet(1);
       actualStartFailureArray.set(containerId.getId() - expectedSuccess, 1);
       // move on to the following failure tests
-      asyncClient.getContainerStatusAsync(containerId, nodeId, containerToken);
+      asyncClient.getContainerStatusAsync(containerId, nodeId);
 
       // Shouldn't crash the test thread
       throw new RuntimeException("Ignorable Exception");
@@ -383,33 +383,30 @@ public class TestNMClientAsync {
         when(client.startContainer(any(Container.class),
             any(ContainerLaunchContext.class))).thenReturn(
                 Collections.<String, ByteBuffer>emptyMap());
-        when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
-            any(Token.class))).thenReturn(
+        when(client.getContainerStatus(any(ContainerId.class),
+            any(NodeId.class))).thenReturn(
                 recordFactory.newRecordInstance(ContainerStatus.class));
         doNothing().when(client).stopContainer(any(ContainerId.class),
-            any(NodeId.class), any(Token.class));
+            any(NodeId.class));
         break;
       case 1:
         doThrow(RPCUtil.getRemoteException("Start Exception")).when(client)
             .startContainer(any(Container.class),
                 any(ContainerLaunchContext.class));
         doThrow(RPCUtil.getRemoteException("Query Exception")).when(client)
-            .getContainerStatus(any(ContainerId.class), any(NodeId.class),
-                any(Token.class));
+            .getContainerStatus(any(ContainerId.class), any(NodeId.class));
         doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
-            .stopContainer(any(ContainerId.class), any(NodeId.class),
-                any(Token.class));
+            .stopContainer(any(ContainerId.class), any(NodeId.class));
         break;
       case 2:
         when(client.startContainer(any(Container.class),
             any(ContainerLaunchContext.class))).thenReturn(
                 Collections.<String, ByteBuffer>emptyMap());
-        when(client.getContainerStatus(any(ContainerId.class), any(NodeId.class),
-            any(Token.class))).thenReturn(
+        when(client.getContainerStatus(any(ContainerId.class),
+            any(NodeId.class))).thenReturn(
                 recordFactory.newRecordInstance(ContainerStatus.class));
         doThrow(RPCUtil.getRemoteException("Stop Exception")).when(client)
-            .stopContainer(any(ContainerId.class), any(NodeId.class),
-                any(Token.class));
+            .stopContainer(any(ContainerId.class), any(NodeId.class));
     }
     return client;
   }
@@ -437,8 +434,7 @@ public class TestNMClientAsync {
     t.start();
 
     barrierA.await();
-    asyncClient.stopContainerAsync(container.getId(), container.getNodeId(),
-        container.getContainerToken());
+    asyncClient.stopContainerAsync(container.getId(), container.getNodeId());
     barrierC.await();
 
     Assert.assertFalse("Starting and stopping should be out of order",

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/api/impl/TestNMClient.java Tue Jun 18 23:19:49 2013
@@ -29,6 +29,7 @@ import java.nio.ByteBuffer;
 import java.util.List;
 import java.util.Set;
 import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.DataOutputBuffer;
@@ -46,10 +47,12 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
 import org.apache.hadoop.yarn.api.records.NodeReport;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.Token;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.client.api.AMRMClient;
 import org.apache.hadoop.yarn.client.api.NMClient;
@@ -75,7 +78,8 @@ public class TestNMClient {
   List<NodeReport> nodeReports = null;
   ApplicationAttemptId attemptId = null;
   int nodeCount = 3;
-
+  ConcurrentHashMap<String, Token> nmTokens;
+  
   @Before
   public void setup() throws YarnException, IOException {
     // start minicluster
@@ -140,6 +144,7 @@ public class TestNMClient {
     if (iterationsLeft == 0) {
       fail("Application hasn't bee started");
     }
+    nmTokens = new ConcurrentHashMap<String, Token>();
 
     // start am rm client
     rmClient =
@@ -151,7 +156,7 @@ public class TestNMClient {
     assertEquals(STATE.STARTED, rmClient.getServiceState());
 
     // start am nm client
-    nmClient = (NMClientImpl) NMClient.createNMClient();
+    nmClient = (NMClientImpl) NMClient.createNMClient(nmTokens);
     nmClient.init(conf);
     nmClient.start();
     assertNotNull(nmClient);
@@ -194,14 +199,13 @@ public class TestNMClient {
     assertEquals(0, nmClient.startedContainers.size());
   }
 
-  @Test (timeout = 60000)
+  @Test (timeout = 200000)
   public void testNMClient()
       throws YarnException, IOException {
-
     rmClient.registerApplicationMaster("Host", 10000, "");
 
     testContainerManagement(nmClient, allocateContainers(rmClient, 5));
-
+    
     rmClient.unregisterApplicationMaster(FinalApplicationStatus.SUCCEEDED,
         null, null);
     // stop the running containers on close
@@ -243,6 +247,11 @@ public class TestNMClient {
       for(Container container : allocResponse.getAllocatedContainers()) {
         containers.add(container);
       }
+      if (!allocResponse.getNMTokens().isEmpty()) {
+        for (NMToken token : allocResponse.getNMTokens()) {
+          nmTokens.put(token.getNodeId().toString(), token.getToken());
+        }
+      }
       if(allocatedContainerCount < containersRequestedAny) {
         // sleep to let NM's heartbeat to RM and trigger allocations
         sleep(1000);
@@ -261,8 +270,7 @@ public class TestNMClient {
       // getContainerStatus shouldn't be called before startContainer,
       // otherwise, NodeManager cannot find the container
       try {
-        nmClient.getContainerStatus(container.getId(), container.getNodeId(),
-            container.getContainerToken());
+        nmClient.getContainerStatus(container.getId(), container.getNodeId());
         fail("Exception is expected");
       } catch (YarnException e) {
         assertTrue("The thrown exception is not expected",
@@ -272,12 +280,11 @@ public class TestNMClient {
       // stopContainer shouldn't be called before startContainer,
       // otherwise, an exception will be thrown
       try {
-        nmClient.stopContainer(container.getId(), container.getNodeId(),
-            container.getContainerToken());
+        nmClient.stopContainer(container.getId(), container.getNodeId());
         fail("Exception is expected");
       } catch (YarnException e) {
         if (!e.getMessage()
-              .contains("is either not started yet or already stopped")) {
+              .contains("is not handled by this NodeManager")) {
           throw (AssertionError)
             (new AssertionError("Exception is not expected: " + e).initCause(
               e));
@@ -306,8 +313,7 @@ public class TestNMClient {
             -1000);
 
         try {
-          nmClient.stopContainer(container.getId(), container.getNodeId(),
-              container.getContainerToken());
+          nmClient.stopContainer(container.getId(), container.getNodeId());
         } catch (YarnException e) {
           throw (AssertionError)
             (new AssertionError("Exception is not expected: " + e)
@@ -335,8 +341,7 @@ public class TestNMClient {
     while (true) {
       try {
         ContainerStatus status = nmClient.getContainerStatus(
-            container.getId(), container.getNodeId(),
-                container.getContainerToken());
+            container.getId(), container.getNodeId());
         // NodeManager may still need some time to get the stable
         // container status
         if (status.getState() == state) {

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/ContainerManagerSecurityInfo.java Tue Jun 18 23:19:49 2013
@@ -55,7 +55,7 @@ public class ContainerManagerSecurityInf
       @Override
       public Class<? extends TokenSelector<? extends TokenIdentifier>>
           value() {
-        return ContainerTokenSelector.class;
+        return NMTokenSelector.class;
       }
     };
 

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenIdentifier.java Tue Jun 18 23:19:49 2013
@@ -21,21 +21,17 @@ package org.apache.hadoop.yarn.security;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.net.InetSocketAddress;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Token;
 
 @Public
 @Evolving
@@ -48,14 +44,14 @@ public class NMTokenIdentifier extends T
   private ApplicationAttemptId appAttemptId;
   private NodeId nodeId;
   private String appSubmitter;
-  private int masterKeyId;
+  private int keyId;
 
   public NMTokenIdentifier(ApplicationAttemptId appAttemptId, NodeId nodeId,
       String applicationSubmitter, int masterKeyId) {
     this.appAttemptId = appAttemptId;
     this.nodeId = nodeId;
     this.appSubmitter = applicationSubmitter;
-    this.masterKeyId = masterKeyId;
+    this.keyId = masterKeyId;
   }
   
   /**
@@ -76,8 +72,8 @@ public class NMTokenIdentifier extends T
     return appSubmitter;
   }
   
-  public int getMastKeyId() {
-    return masterKeyId;
+  public int getKeyId() {
+    return keyId;
   }
   
   @Override
@@ -89,7 +85,7 @@ public class NMTokenIdentifier extends T
     out.writeInt(appAttemptId.getAttemptId());
     out.writeUTF(this.nodeId.toString());
     out.writeUTF(this.appSubmitter);
-    out.writeInt(this.masterKeyId);
+    out.writeInt(this.keyId);
   }
 
   @Override
@@ -101,7 +97,7 @@ public class NMTokenIdentifier extends T
     String[] hostAddr = in.readUTF().split(":");
     nodeId = NodeId.newInstance(hostAddr[0], Integer.parseInt(hostAddr[1]));
     appSubmitter = in.readUTF();
-    masterKeyId = in.readInt();
+    keyId = in.readInt();
   }
 
   @Override

Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java?rev=1494369&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/security/NMTokenSelector.java Tue Jun 18 23:19:49 2013
@@ -0,0 +1,56 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.security;
+
+import java.util.Collection;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+
+public class NMTokenSelector implements
+    TokenSelector<NMTokenIdentifier> {
+
+  private static final Log LOG = LogFactory
+      .getLog(NMTokenSelector.class);
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public Token<NMTokenIdentifier> selectToken(Text service,
+      Collection<Token<? extends TokenIdentifier>> tokens) {
+    if (service == null) {
+      return null;
+    }
+    for (Token<? extends TokenIdentifier> token : tokens) {
+      if (LOG.isDebugEnabled()) {
+        LOG.info("Looking for service: " + service + ". Current token is "
+            + token);
+      }
+      if (NMTokenIdentifier.KIND.equals(token.getKind()) && 
+          service.equals(token.getService())) {
+        return (Token<NMTokenIdentifier>) token;
+      }
+    }
+    return null;
+  }
+
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenIdentifier Tue Jun 18 23:19:49 2013
@@ -15,3 +15,4 @@ org.apache.hadoop.yarn.security.Containe
 org.apache.hadoop.yarn.security.AMRMTokenIdentifier
 org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier
 org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier
+org.apache.hadoop.yarn.security.NMTokenIdentifier

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml Tue Jun 18 23:19:49 2013
@@ -659,7 +659,25 @@
     <name>yarn.client.nodemanager-client-async.thread-pool-max-size</name>
     <value>500</value>
   </property>
-
+  
+  <property>
+  	<description>
+  	  Maximum number of proxy connections for node manager. It should always be
+      more than 1. NMClient and MRAppMaster will use this to cache connection
+      with node manager. There will be at max one connection per node manager.
+      Ex. configuring it to a value of 5 will make sure that client will at
+      max have 5 connections cached with 5 different node managers. These
+      connections will be timed out if idle for more than system wide idle
+      timeout period. The token if used for authentication then it will be used
+      only at connection creation time. If new token is received then earlier
+      connection should be closed in order to use newer token. This and
+      (yarn.client.nodemanager-client-async.thread-pool-max-size) are related
+      and should be sync (no need for them to be equal).
+  	</description>
+  	<name>yarn.client.max-nodemanagers-proxies</name>
+  	<value>500</value>
+  </property>
+  
   <!--Map Reduce configuration-->
   <property>
     <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/security/BaseNMTokenSecretManager.java Tue Jun 18 23:19:49 2013
@@ -114,4 +114,38 @@ public class BaseNMTokenSecretManager ex
   public NMTokenIdentifier createIdentifier() {
     return new NMTokenIdentifier();
   }
+  
+  /**
+   * Helper function for creating NMTokens.
+   */
+  public Token createNMToken(ApplicationAttemptId applicationAttemptId,
+      NodeId nodeId, String applicationSubmitter) {
+    byte[] password;
+    NMTokenIdentifier identifier;
+    
+    this.readLock.lock();
+    try {
+      identifier =
+          new NMTokenIdentifier(applicationAttemptId, nodeId,
+              applicationSubmitter, this.currentMasterKey.getMasterKey()
+                  .getKeyId());
+      password = this.createPassword(identifier);
+    } finally {
+      this.readLock.unlock();
+    }
+    return newInstance(password, identifier);
+  }
+  
+  public static Token newInstance(byte[] password,
+      NMTokenIdentifier identifier) {
+    NodeId nodeId = identifier.getNodeId();
+    // RPC layer client expects ip:port as service for tokens
+    InetSocketAddress addr =
+        NetUtils.createSocketAddrForHost(nodeId.getHost(), nodeId.getPort());
+    Token nmToken =
+        Token.newInstance(identifier.getBytes(),
+          NMTokenIdentifier.KIND.toString(), password, SecurityUtil
+            .buildTokenService(addr).toString());
+    return nmToken;
+  }
 }

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java Tue Jun 18 23:19:49 2013
@@ -27,6 +27,7 @@ import java.util.Comparator;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
@@ -63,6 +64,8 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Builder utilities to construct various objects.
  *
@@ -152,7 +155,8 @@ public class BuilderUtils {
       int port, String user, Resource r, long expiryTime, int masterKeyId,
       byte[] password, long rmIdentifier) throws IOException {
     ContainerTokenIdentifier identifier =
-        new ContainerTokenIdentifier(cId, host, user, r, expiryTime,
+        new ContainerTokenIdentifier(cId, host + ":" + port, user, r,
+            expiryTime,
             masterKeyId, rmIdentifier);
     return newContainerToken(BuilderUtils.newNodeId(host, port), password,
         identifier);
@@ -228,6 +232,8 @@ public class BuilderUtils {
     return newToken(Token.class, identifier, kind, password, service);
   }
 
+  @Private
+  @VisibleForTesting
   public static Token newContainerToken(NodeId nodeId,
       byte[] password, ContainerTokenIdentifier tokenIdentifier) {
     // RPC layer client expects ip:port as service for tokens

Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1494369&r1=1494368&r2=1494369&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Tue Jun 18 23:19:49 2013
@@ -435,7 +435,7 @@ public class NodeManager extends Composi
   }
 
   @VisibleForTesting
-  Context getNMContext() {
+  public Context getNMContext() {
     return this.context;
   }